public void process()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java [451:976]


    public void process(
        ProcessContext c,
        final PipelineOptions pipelineOptions,
        @Element KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>> element,
        @Timestamp org.joda.time.Instant elementTs,
        final @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
        final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
        final @StateId("updatedSchema") ValueState<TableSchema> updatedSchema,
        @TimerId("idleTimer") Timer idleTimer,
        final MultiOutputReceiver o)
        throws Exception {
      BigQueryOptions bigQueryOptions = pipelineOptions.as(BigQueryOptions.class);

      if (autoUpdateSchema) {
        updatedSchema.readLater();
      }

      dynamicDestinations.setSideInputAccessorFromProcessContext(c);
      TableDestination tableDestination =
          destinations.computeIfAbsent(
              element.getKey().getKey(),
              dest -> {
                TableDestination tableDestination1 = dynamicDestinations.getTable(dest);
                checkArgument(
                    tableDestination1 != null,
                    "DynamicDestinations.getTable() may not return null, "
                        + "but %s returned null for destination %s",
                    dynamicDestinations,
                    dest);
                return tableDestination1;
              });
      final String tableId = tableDestination.getTableUrn(bigQueryOptions);
      final String shortTableId = tableDestination.getShortTableUrn();
      final DatasetService datasetService = getDatasetService(pipelineOptions);
      final WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions);

      Lineage.getSinks()
          .add(
              "bigquery",
              BigQueryHelpers.dataCatalogSegments(
                  tableDestination.getTableReference(), bigQueryOptions));

      Coder<DestinationT> destinationCoder = dynamicDestinations.getDestinationCoder();
      Callable<Boolean> tryCreateTable =
          () -> {
            DestinationT dest = element.getKey().getKey();
            CreateTableHelpers.possiblyCreateTable(
                c.getPipelineOptions().as(BigQueryOptions.class),
                tableDestination,
                () -> dynamicDestinations.getSchema(dest),
                () -> dynamicDestinations.getTableConstraints(dest),
                createDisposition,
                destinationCoder,
                kmsKey,
                bqServices,
                bigLakeConfiguration);
            return true;
          };

      Supplier<String> getOrCreateStream =
          () ->
              getOrCreateStream(
                  tableId, streamName, streamOffset, idleTimer, writeStreamService, tryCreateTable);
      Callable<AppendClientInfo> getAppendClientInfo =
          () -> {
            @Nullable TableSchema tableSchema;
            DescriptorProtos.DescriptorProto descriptor;
            TableSchema updatedSchemaValue = updatedSchema.read();
            if (autoUpdateSchema && updatedSchemaValue != null) {
              // We've seen an updated schema, so we use that instead of querying the
              // MessageConverter.
              tableSchema = updatedSchemaValue;
              descriptor =
                  TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
                      tableSchema, true, false);
            } else {
              // Start off with the base schema. As we get notified of schema updates, we
              // will update the descriptor.
              StorageApiDynamicDestinations.MessageConverter<?> converter =
                  messageConverters.get(
                      element.getKey().getKey(), dynamicDestinations, datasetService);
              tableSchema = converter.getTableSchema();
              descriptor = converter.getDescriptor(false);

              if (autoUpdateSchema) {
                // A StreamWriter ignores table schema updates that happen prior to its creation.
                // So before creating a StreamWriter below, we fetch the table schema to check if we
                // missed an update.
                // If so, use the new schema instead of the base schema
                @Nullable
                TableSchema streamSchema =
                    MoreObjects.firstNonNull(
                        writeStreamService.getWriteStreamSchema(getOrCreateStream.get()),
                        TableSchema.getDefaultInstance());
                Optional<TableSchema> newSchema =
                    TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);

                if (newSchema.isPresent()) {
                  tableSchema = newSchema.get();
                  descriptor =
                      TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
                          tableSchema, true, false);
                  updatedSchema.write(tableSchema);
                }
              }
            }
            AppendClientInfo info =
                AppendClientInfo.of(
                        Preconditions.checkStateNotNull(tableSchema),
                        descriptor,
                        // Make sure that the client is always closed in a different thread
                        // to
                        // avoid blocking.
                        client ->
                            runAsyncIgnoreFailure(
                                closeWriterExecutor,
                                () -> {
                                  // Remove the pin that is "owned" by the cache.
                                  client.unpin();
                                  client.close();
                                }))
                    .withAppendClient(
                        writeStreamService,
                        getOrCreateStream,
                        false,
                        defaultMissingValueInterpretation);
            // This pin is "owned" by the cache.
            Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin();
            return info;
          };

      AtomicReference<AppendClientInfo> appendClientInfo =
          new AtomicReference<>(APPEND_CLIENTS.get(element.getKey(), getAppendClientInfo));
      String currentStream = getOrCreateStream.get();
      if (!currentStream.equals(appendClientInfo.get().getStreamName())) {
        // Cached append client is inconsistent with persisted state. Throw away cached item and
        // force it to be
        // recreated.
        APPEND_CLIENTS.invalidate(element.getKey());
        appendClientInfo.set(APPEND_CLIENTS.get(element.getKey(), getAppendClientInfo));
      }

      TableSchema updatedSchemaValue = updatedSchema.read();
      if (autoUpdateSchema && updatedSchemaValue != null) {
        if (appendClientInfo.get().hasSchemaChanged(updatedSchemaValue)) {
          appendClientInfo.set(
              AppendClientInfo.of(
                  updatedSchemaValue, appendClientInfo.get().getCloseAppendClient(), false));
          APPEND_CLIENTS.invalidate(element.getKey());
          APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
        }
      }

      // Each ProtoRows object contains at most 1MB of rows.
      // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
      // already proto or already schema.
      Iterable<SplittingIterable.Value> messages =
          new SplittingIterable(
              element.getValue(),
              splitSize,
              (fields, ignore) -> appendClientInfo.get().encodeUnknownFields(fields, ignore),
              bytes -> appendClientInfo.get().toTableRow(bytes, Predicates.alwaysTrue()),
              (failedRow, errorMessage) -> {
                o.get(failedRowsTag)
                    .outputWithTimestamp(
                        new BigQueryStorageApiInsertError(failedRow.getValue(), errorMessage),
                        failedRow.getTimestamp());
                rowsSentToFailedRowsCollection.inc();
                BigQuerySinkMetrics.appendRowsRowStatusCounter(
                        BigQuerySinkMetrics.RowStatus.FAILED,
                        BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
                        shortTableId)
                    .inc(1);
              },
              autoUpdateSchema,
              ignoreUnknownValues,
              elementTs);

      // Initialize stream names and offsets for all contexts. This will be called initially, but
      // will also be called if we roll over to a new stream on a retry.
      BiConsumer<Iterable<AppendRowsContext>, Boolean> initializeContexts =
          (contexts, isFailure) -> {
            try {
              if (isFailure) {
                // Clear the stream name, forcing a new one to be created.
                streamName.write("");
              }
              appendClientInfo.set(
                  appendClientInfo
                      .get()
                      .withAppendClient(
                          writeStreamService,
                          getOrCreateStream,
                          false,
                          defaultMissingValueInterpretation));
              StreamAppendClient streamAppendClient =
                  Preconditions.checkArgumentNotNull(
                      appendClientInfo.get().getStreamAppendClient());
              String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read());
              long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read());
              for (AppendRowsContext context : contexts) {
                context.streamName = streamNameRead;
                streamAppendClient.pin();
                context.client = appendClientInfo.get().getStreamAppendClient();
                context.offset = currentOffset;
                ++context.tryIteration;
                currentOffset = context.offset + context.protoRows.getSerializedRowsCount();
              }
              streamOffset.write(currentOffset);
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          };

      Consumer<Iterable<AppendRowsContext>> clearClients =
          contexts -> {
            APPEND_CLIENTS.invalidate(element.getKey());
            appendClientInfo.set(appendClientInfo.get().withNoAppendClient());
            APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
            for (AppendRowsContext context : contexts) {
              if (context.client != null) {
                // Unpin in a different thread, as it may execute a blocking close.
                runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
                context.client = null;
              }
            }
          };

      Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> runOperation =
          context -> {
            if (context.protoRows.getSerializedRowsCount() == 0) {
              // This might happen if all rows in a batch failed and were sent to the failed-rows
              // PCollection.
              return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
            }
            try {
              appendClientInfo.set(
                  appendClientInfo
                      .get()
                      .withAppendClient(
                          writeStreamService,
                          getOrCreateStream,
                          false,
                          defaultMissingValueInterpretation));
              return Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient())
                  .appendRows(context.offset, context.protoRows);
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          };

      Function<Iterable<AppendRowsContext>, RetryType> onError =
          failedContexts -> {
            // The first context is always the one that fails.
            AppendRowsContext failedContext =
                Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
            BigQuerySinkMetrics.reportFailedRPCMetrics(
                failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId);
            String errorCode =
                BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());

            // AppendSerializationError means that BigQuery detected errors on individual rows, e.g.
            // a row not conforming
            // to bigQuery invariants. These errors are persistent, so we redirect those rows to the
            // failedInserts
            // PCollection, and retry with the remaining rows.
            if (failedContext.getError() != null
                && failedContext.getError() instanceof Exceptions.AppendSerializationError) {
              Exceptions.AppendSerializationError error =
                  Preconditions.checkArgumentNotNull(
                      (Exceptions.AppendSerializationError) failedContext.getError());

              Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet();
              for (int failedIndex : failedRowIndices) {
                // Convert the message to a TableRow and send it to the failedRows collection.
                TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex);
                if (failedRow == null) {
                  ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex);
                  failedRow =
                      appendClientInfo.get().toTableRow(protoBytes, Predicates.alwaysTrue());
                }
                org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex);
                o.get(failedRowsTag)
                    .outputWithTimestamp(
                        new BigQueryStorageApiInsertError(
                            failedRow, error.getRowIndexToErrorMessage().get(failedIndex)),
                        timestamp);
              }
              int failedRows = failedRowIndices.size();
              rowsSentToFailedRowsCollection.inc(failedRows);
              BigQuerySinkMetrics.appendRowsRowStatusCounter(
                      BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId)
                  .inc(failedRows);

              // Remove the failed row from the payload, so we retry the batch without the failed
              // rows.
              ProtoRows.Builder retryRows = ProtoRows.newBuilder();
              @Nullable List<org.joda.time.Instant> timestamps = Lists.newArrayList();
              for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) {
                if (!failedRowIndices.contains(i)) {
                  ByteString rowBytes = failedContext.protoRows.getSerializedRows(i);
                  retryRows.addSerializedRows(rowBytes);
                  timestamps.add(failedContext.timestamps.get(i));
                }
              }
              failedContext.protoRows = retryRows.build();
              failedContext.timestamps = timestamps;
              int retriedRows = failedContext.protoRows.getSerializedRowsCount();
              BigQuerySinkMetrics.appendRowsRowStatusCounter(
                      BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId)
                  .inc(retriedRows);

              // Since we removed rows, we need to update the insert offsets for all remaining rows.
              long offset = failedContext.offset;
              for (AppendRowsContext context : failedContexts) {
                context.offset = offset;
                offset += context.protoRows.getSerializedRowsCount();
              }
              streamOffset.write(offset);
              return RetryType.RETRY_ALL_OPERATIONS;
            }

            Throwable error = Preconditions.checkStateNotNull(failedContext.getError());
            Status.Code statusCode = Status.fromThrowable(error).getCode();

            // This means that the offset we have stored does not match the current end of
            // the stream in the Storage API. Usually this happens because a crash or a bundle
            // failure
            // happened after an append but before the worker could checkpoint it's
            // state. The records that were appended in a failed bundle will be retried,
            // meaning that the unflushed tail of the stream must be discarded to prevent
            // duplicates.
            boolean offsetMismatch =
                statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS);

            boolean quotaError = statusCode.equals(Code.RESOURCE_EXHAUSTED);
            if (!offsetMismatch) {
              // Don't log errors for expected offset mismatch. These will be logged as warnings
              // below.
              LOG.error(
                  "Got error " + failedContext.getError() + " closing " + failedContext.streamName);
            }

            try {
              // TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces
              // them.
              tryCreateTable.call();
            } catch (Exception e) {
              throw new RuntimeException(e);
            }

            if (!quotaError) {
              // This forces us to close and reopen all gRPC connections to Storage API on error,
              // which empirically fixes random stuckness issues.
              clearClients.accept(failedContexts);
            }
            appendFailures.inc();
            int retriedRows = failedContext.protoRows.getSerializedRowsCount();
            BigQuerySinkMetrics.appendRowsRowStatusCounter(
                    BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId)
                .inc(retriedRows);

            boolean explicitStreamFinalized =
                failedContext.getError() instanceof StreamFinalizedException;
            // This implies that the stream doesn't exist or has already been finalized. In this
            // case we have no choice but to create a new stream.
            boolean streamDoesNotExist =
                explicitStreamFinalized
                    || statusCode.equals(Code.INVALID_ARGUMENT)
                    || statusCode.equals(Code.NOT_FOUND)
                    || statusCode.equals(Code.FAILED_PRECONDITION);
            if (offsetMismatch || streamDoesNotExist) {
              appendOffsetFailures.inc();
              LOG.warn(
                  "Append to "
                      + failedContext
                      + " failed with "
                      + failedContext.getError()
                      + " Will retry with a new stream");
              // Finalize the stream and clear streamName so a new stream will be created.
              o.get(flushTag)
                  .output(
                      KV.of(
                          failedContext.streamName, new Operation(failedContext.offset - 1, true)));
              // Reinitialize all contexts with the new stream and new offsets.
              initializeContexts.accept(failedContexts, true);

              // Offset failures imply that all subsequent parallel appends will also fail.
              // Retry them all.
              return RetryType.RETRY_ALL_OPERATIONS;
            }

            return RetryType.RETRY_ALL_OPERATIONS;
          };

      Consumer<AppendRowsContext> onSuccess =
          context -> {
            AppendRowsResponse response = Preconditions.checkStateNotNull(context.getResult());
            o.get(flushTag)
                .output(
                    KV.of(
                        context.streamName,
                        new Operation(
                            context.offset + context.protoRows.getSerializedRowsCount() - 1,
                            false)));
            int flushedRows = context.protoRows.getSerializedRowsCount();
            flushesScheduled.inc(flushedRows);
            BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
                context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId);
            BigQuerySinkMetrics.appendRowsRowStatusCounter(
                    BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableId)
                .inc(flushedRows);

            if (successfulRowsTag != null) {
              for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) {
                ByteString protoBytes = context.protoRows.getSerializedRows(i);
                org.joda.time.Instant timestamp = context.timestamps.get(i);
                o.get(successfulRowsTag)
                    .outputWithTimestamp(
                        appendClientInfo.get().toTableRow(protoBytes, successfulRowsPredicate),
                        timestamp);
              }
            }
          };
      Instant now = Instant.now();
      List<AppendRowsContext> contexts = Lists.newArrayList();
      RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
          new RetryManager<>(
              Duration.standardSeconds(1),
              Duration.standardSeconds(20),
              maxRetries,
              BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
      int numAppends = 0;
      for (SplittingIterable.Value splitValue : messages) {
        // Handle the case of a row that is too large.
        if (splitValue.getProtoRows().getSerializedSize() >= maxRequestSize) {
          if (splitValue.getProtoRows().getSerializedRowsCount() > 1) {
            // TODO(reuvenlax): Is it worth trying to handle this case by splitting the protoRows?
            // Given that we split
            // the ProtoRows iterable at 2MB and the max request size is 10MB, this scenario seems
            // nearly impossible.
            LOG.error(
                "A request containing more than one row is over the request size limit of "
                    + maxRequestSize
                    + ". This is unexpected. All rows in the request will be sent to the failed-rows PCollection.");
          }
          for (int i = 0; i < splitValue.getProtoRows().getSerializedRowsCount(); ++i) {
            org.joda.time.Instant timestamp = splitValue.getTimestamps().get(i);
            TableRow failedRow = splitValue.getFailsafeTableRows().get(i);
            if (failedRow == null) {
              ByteString rowBytes = splitValue.getProtoRows().getSerializedRows(i);
              failedRow = appendClientInfo.get().toTableRow(rowBytes, Predicates.alwaysTrue());
            }
            o.get(failedRowsTag)
                .outputWithTimestamp(
                    new BigQueryStorageApiInsertError(
                        failedRow, "Row payload too large. Maximum size " + maxRequestSize),
                    timestamp);
          }
          int numRowsFailed = splitValue.getProtoRows().getSerializedRowsCount();
          rowsSentToFailedRowsCollection.inc(numRowsFailed);
          BigQuerySinkMetrics.appendRowsRowStatusCounter(
                  BigQuerySinkMetrics.RowStatus.FAILED,
                  BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
                  shortTableId)
              .inc(numRowsFailed);
        } else {
          ++numAppends;
          // RetryManager
          AppendRowsContext context =
              new AppendRowsContext(
                  element.getKey(),
                  splitValue.getProtoRows(),
                  splitValue.getTimestamps(),
                  splitValue.getFailsafeTableRows());
          contexts.add(context);
          retryManager.addOperation(runOperation, onError, onSuccess, context);
          recordsAppended.inc(splitValue.getProtoRows().getSerializedRowsCount());
          appendSizeDistribution.update(context.protoRows.getSerializedRowsCount());
        }
      }

      if (numAppends > 0) {
        initializeContexts.accept(contexts, false);
        try {
          retryManager.run(true);
        } finally {
          // Make sure that all pins are removed.
          for (AppendRowsContext context : contexts) {
            if (context.client != null) {
              runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
            }
          }
        }
        appendSplitDistribution.update(numAppends);

        if (autoUpdateSchema) {
          @Nullable
          StreamAppendClient streamAppendClient = appendClientInfo.get().getStreamAppendClient();
          TableSchema originalSchema = appendClientInfo.get().getTableSchema();
          ;
          @Nullable
          TableSchema updatedSchemaReturned =
              (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null;
          // Update the table schema and clear the append client.
          if (updatedSchemaReturned != null) {
            Optional<TableSchema> newSchema =
                TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned);
            if (newSchema.isPresent()) {
              appendClientInfo.set(
                  AppendClientInfo.of(
                      newSchema.get(), appendClientInfo.get().getCloseAppendClient(), false));
              APPEND_CLIENTS.invalidate(element.getKey());
              APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
              LOG.debug(
                  "Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned);
              updatedSchema.write(newSchema.get());
            }
          }
        }

        java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
        appendLatencyDistribution.update(timeElapsed.toMillis());
      }
      idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
    }