private WriteResult continueExpandTyped()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java [3731:3971]


    private <DestinationT> WriteResult continueExpandTyped(
        PCollection<KV<DestinationT, T>> input,
        Coder<T> elementCoder,
        @Nullable Schema elementSchema,
        @Nullable SerializableFunction<T, Row> elementToRowFunction,
        Coder<DestinationT> destinationCoder,
        DynamicDestinations<T, DestinationT> dynamicDestinations,
        RowWriterFactory<T, DestinationT> rowWriterFactory,
        Write.Method method) {
      if (method == Write.Method.STREAMING_INSERTS) {
        checkArgument(
            getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection.");
        InsertRetryPolicy retryPolicy =
            MoreObjects.firstNonNull(getFailedInsertRetryPolicy(), InsertRetryPolicy.alwaysRetry());

        checkArgument(
            rowWriterFactory.getOutputType() == RowWriterFactory.OutputType.JsonTableRow,
            "Avro output is not supported when method == STREAMING_INSERTS");
        checkArgument(
            getSchemaUpdateOptions() == null || getSchemaUpdateOptions().isEmpty(),
            "SchemaUpdateOptions are not supported when method == STREAMING_INSERTS");
        checkArgument(
            !getPropagateSuccessfulStorageApiWrites(),
            "withPropagateSuccessfulStorageApiWrites only supported when using storage api writes.");
        checkArgument(
            getBadRecordRouter() instanceof ThrowingBadRecordRouter,
            "Error Handling is not supported with STREAMING_INSERTS");

        RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
            (RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
        StreamingInserts<DestinationT, T> streamingInserts =
            new StreamingInserts<>(
                    getCreateDisposition(),
                    dynamicDestinations,
                    elementCoder,
                    tableRowWriterFactory.getToRowFn(),
                    tableRowWriterFactory.getToFailsafeRowFn())
                .withInsertRetryPolicy(retryPolicy)
                .withTestServices(getBigQueryServices())
                .withExtendedErrorInfo(getExtendedErrorInfo())
                .withSkipInvalidRows(getSkipInvalidRows())
                .withIgnoreUnknownValues(getIgnoreUnknownValues())
                .withIgnoreInsertIds(getIgnoreInsertIds())
                .withAutoSharding(getAutoSharding())
                .withSuccessfulInsertsPropagation(getPropagateSuccessful())
                .withDeterministicRecordIdFn(getDeterministicRecordIdFn())
                .withKmsKey(getKmsKey());
        return input.apply(streamingInserts);
      } else if (method == Write.Method.FILE_LOADS) {
        checkArgument(
            getFailedInsertRetryPolicy() == null,
            "Record-insert retry policies are not supported when using BigQuery load jobs.");

        if (getUseAvroLogicalTypes()) {
          checkArgument(
              rowWriterFactory.getOutputType() == OutputType.AvroGenericRecord,
              "useAvroLogicalTypes can only be set with Avro output.");
        }
        checkArgument(
            !getPropagateSuccessfulStorageApiWrites(),
            "withPropagateSuccessfulStorageApiWrites only supported when using storage api writes.");
        if (!(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
          LOG.warn(
              "Error Handling is partially supported when using FILE_LOADS. Consider using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE");
        }

        // Batch load handles wrapped json string value differently than the other methods. Raise a
        // warning when applies.
        if (getJsonSchema() != null && getJsonSchema().isAccessible()) {
          JsonElement schema = JsonParser.parseString(getJsonSchema().get());
          if (!schema.getAsJsonObject().keySet().isEmpty() && hasJsonTypeInSchema(schema)) {
            if (rowWriterFactory.getOutputType() == OutputType.JsonTableRow) {
              LOG.warn(
                  "Found JSON type in TableSchema for 'FILE_LOADS' write method. \n"
                      + "Make sure the TableRow value is a Jackson JsonNode to ensure the read as a "
                      + "JSON type. Otherwise it will read as a raw (escaped) string.\n"
                      + "See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#limitations "
                      + "for limitations.");
            } else if (rowWriterFactory.getOutputType() == OutputType.AvroGenericRecord) {
              LOG.warn(
                  "Found JSON type in TableSchema for 'FILE_LOADS' write method. \n"
                      + " check steps in https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#extract_json_data_from_avro_data "
                      + " to ensure the read as a JSON type. Otherwise it will read as a raw "
                      + "(escaped) string.");
            }
          }
        }

        BatchLoads<DestinationT, T> batchLoads =
            new BatchLoads<>(
                getWriteDisposition(),
                getCreateDisposition(),
                getJsonTableRef() != null,
                dynamicDestinations,
                destinationCoder,
                getCustomGcsTempLocation(),
                getLoadJobProjectId(),
                getIgnoreUnknownValues(),
                elementCoder,
                rowWriterFactory,
                getKmsKey(),
                getJsonClustering() != null,
                getUseAvroLogicalTypes(),
                getWriteTempDataset(),
                getBadRecordRouter(),
                getBadRecordErrorHandler());
        batchLoads.setTestServices(getBigQueryServices());
        if (getSchemaUpdateOptions() != null) {
          batchLoads.setSchemaUpdateOptions(getSchemaUpdateOptions());
        }
        if (getMaxFilesPerBundle() != null) {
          batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
        }
        if (getMaxFileSize() != null) {
          batchLoads.setMaxFileSize(getMaxFileSize());
        }
        batchLoads.setMaxFilesPerPartition(getMaxFilesPerPartition());
        batchLoads.setMaxBytesPerPartition(getMaxBytesPerPartition());

        // When running in streaming (unbounded mode) we want to retry failed load jobs
        // indefinitely. Failing the bundle is expensive, so we set a fairly high limit on retries.
        if (IsBounded.UNBOUNDED.equals(input.isBounded())) {
          batchLoads.setMaxRetryJobs(getMaxRetryJobs());
        }
        batchLoads.setTriggeringFrequency(getTriggeringFrequency());
        if (getAutoSharding()) {
          batchLoads.setNumFileShards(0);
        } else {
          batchLoads.setNumFileShards(getNumFileShards());
        }
        return input.apply(batchLoads);
      } else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) {
        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
        StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
        if (getUseBeamSchema()) {
          // This ensures that the Beam rows are directly translated into protos for Storage API
          // writes, with no
          // need to round trip through JSON TableRow objects.
          storageApiDynamicDestinations =
              new StorageApiDynamicDestinationsBeamRow<>(
                  dynamicDestinations,
                  elementSchema,
                  elementToRowFunction,
                  getFormatRecordOnFailureFunction(),
                  getRowMutationInformationFn() != null);
        } else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
          // We could support both of these by falling back to
          // StorageApiDynamicDestinationsTableRow. This
          // would defeat the optimization (we would be forced to create a new dynamic proto message
          // and copy the data over). For now, we simply give the user a way to disable the
          // optimization themselves.
          checkArgument(
              getRowMutationInformationFn() == null,
              "Row upserts and deletes are not for direct proto writes. "
                  + "Try setting withDirectWriteProtos(false)");
          checkArgument(
              !getAutoSchemaUpdate(),
              "withAutoSchemaUpdate not supported when using writeProtos."
                  + " Try setting withDirectWriteProtos(false)");
          checkArgument(
              !getIgnoreUnknownValues(),
              "ignoreUnknownValues not supported when using writeProtos."
                  + " Try setting withDirectWriteProtos(false)");
          storageApiDynamicDestinations =
              (StorageApiDynamicDestinations<T, DestinationT>)
                  new StorageApiDynamicDestinationsProto(
                      dynamicDestinations,
                      getWriteProtosClass(),
                      getFormatRecordOnFailureFunction());
        } else if (getAvroRowWriterFactory() != null) {
          // we can configure the avro to storage write api proto converter for this
          // assuming the format function returns an Avro GenericRecord
          // and there is a schema defined
          checkArgument(
              getJsonSchema() != null
                  || getDynamicDestinations() != null
                  || getSchemaFromView() != null,
              "A schema must be provided for avro rows to be used with StorageWrite API.");

          RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>
              recordWriterFactory =
                  (RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>)
                      rowWriterFactory;
          SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema> avroSchemaFactory =
              Optional.ofNullable(getAvroSchemaFactory()).orElse(DEFAULT_AVRO_SCHEMA_FACTORY);

          storageApiDynamicDestinations =
              new StorageApiDynamicDestinationsGenericRecord<>(
                  dynamicDestinations,
                  avroSchemaFactory,
                  recordWriterFactory.getToAvroFn(),
                  getFormatRecordOnFailureFunction(),
                  getRowMutationInformationFn() != null);
        } else {
          RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
              (RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
          // Fallback behavior: convert to JSON TableRows and convert those into Beam TableRows.
          storageApiDynamicDestinations =
              new StorageApiDynamicDestinationsTableRow<>(
                  dynamicDestinations,
                  tableRowWriterFactory.getToRowFn(),
                  getFormatRecordOnFailureFunction(),
                  getRowMutationInformationFn() != null,
                  getCreateDisposition(),
                  getIgnoreUnknownValues(),
                  getAutoSchemaUpdate());
        }

        int numShards = getStorageApiNumStreams(bqOptions);
        boolean enableAutoSharding = getAutoSharding();
        if (numShards == 0) {
          enableAutoSharding = true;
        }

        StorageApiLoads<DestinationT, T> storageApiLoads =
            new StorageApiLoads<>(
                destinationCoder,
                storageApiDynamicDestinations,
                getRowMutationInformationFn(),
                getCreateDisposition(),
                getKmsKey(),
                getStorageApiTriggeringFrequency(bqOptions),
                getBigQueryServices(),
                getStorageApiNumStreams(bqOptions),
                method == Method.STORAGE_API_AT_LEAST_ONCE,
                enableAutoSharding,
                getAutoSchemaUpdate(),
                getIgnoreUnknownValues(),
                getPropagateSuccessfulStorageApiWrites(),
                getPropagateSuccessfulStorageApiWritesPredicate(),
                getRowMutationInformationFn() != null,
                getDefaultMissingValueInterpretation(),
                getBigLakeConfiguration(),
                getBadRecordRouter(),
                getBadRecordErrorHandler());
        return input.apply("StorageApiLoads", storageApiLoads);
      } else {
        throw new RuntimeException("Unexpected write method " + method);
      }
    }