public WriteResult expand()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java [3397:3592]


    public WriteResult expand(PCollection<T> input) {
      // We must have a destination to write to!
      checkArgument(
          getTableFunction() != null
              || getJsonTableRef() != null
              || getDynamicDestinations() != null,
          "must set the table reference of a BigQueryIO.Write transform");

      List<?> allToArgs =
          Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations());
      checkArgument(
          1
              == Iterables.size(
                  allToArgs.stream()
                      .filter(Predicates.notNull()::apply)
                      .collect(Collectors.toList())),
          "Exactly one of jsonTableRef, tableFunction, or dynamicDestinations must be set");

      List<?> allSchemaArgs =
          Lists.newArrayList(getJsonSchema(), getSchemaFromView(), getDynamicDestinations());
      checkArgument(
          2
              > Iterables.size(
                  allSchemaArgs.stream()
                      .filter(Predicates.notNull()::apply)
                      .collect(Collectors.toList())),
          "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may be set");

      // Perform some argument checks
      BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
      Write.Method method = resolveMethod(input);
      if (input.isBounded() == IsBounded.UNBOUNDED) {
        if (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API) {
          Duration triggeringFrequency =
              (method == Write.Method.STORAGE_WRITE_API)
                  ? getStorageApiTriggeringFrequency(bqOptions)
                  : getTriggeringFrequency();
          checkArgument(
              triggeringFrequency != null,
              "When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, "
                  + "triggering frequency must be specified");
        } else {
          checkArgument(
              getTriggeringFrequency() == null,
              "Triggering frequency can be specified only when writing via FILE_LOADS or STORAGE_WRITE_API, but the method was %s.",
              method);
        }
        if (method != Method.FILE_LOADS) {
          checkArgument(
              getNumFileShards() == 0,
              "Number of file shards can be specified only when writing via FILE_LOADS, but the method was %s.",
              method);
        }
        if (method == Method.STORAGE_API_AT_LEAST_ONCE
            && getStorageApiTriggeringFrequency(bqOptions) != null) {
          LOG.warn(
              "Storage API triggering frequency option will be ignored is it can only be specified only "
                  + "when writing via STORAGE_WRITE_API, but the method was {}.",
              method);
        }
        if (getAutoSharding()) {
          if (method == Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) > 0) {
            LOG.warn(
                "Both numStorageWriteApiStreams and auto-sharding options are set. Will default to auto-sharding."
                    + " To set a fixed number of streams, do not enable auto-sharding.");
          } else if (method == Method.FILE_LOADS && getNumFileShards() > 0) {
            LOG.warn(
                "Both numFileShards and auto-sharding options are set. Will default to auto-sharding."
                    + " To set a fixed number of file shards, do not enable auto-sharding.");
          } else if (method == Method.STORAGE_API_AT_LEAST_ONCE) {
            LOG.warn(
                "The setting of auto-sharding is ignored. It is only supported when writing an"
                    + " unbounded PCollection via FILE_LOADS, STREAMING_INSERTS or"
                    + " STORAGE_WRITE_API, but the method was {}.",
                method);
          }
        }
      } else { // PCollection is bounded
        String error =
            String.format(
                " is only applicable to an unbounded PCollection, but the input PCollection is %s.",
                input.isBounded());
        checkArgument(getTriggeringFrequency() == null, "Triggering frequency" + error);
        checkArgument(!getAutoSharding(), "Auto-sharding" + error);
        checkArgument(getNumFileShards() == 0, "Number of file shards" + error);

        if (getStorageApiTriggeringFrequency(bqOptions) != null) {
          LOG.warn("Setting a triggering frequency" + error);
        }
        if (getStorageApiNumStreams(bqOptions) != 0) {
          LOG.warn("Setting the number of Storage API streams" + error);
        }
      }

      if (method == Method.STORAGE_API_AT_LEAST_ONCE && getStorageApiNumStreams(bqOptions) != 0) {
        LOG.warn(
            "Setting a number of Storage API streams is only supported when using STORAGE_WRITE_API");
      }

      if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) {
        checkArgument(
            !getAutoSchemaUpdate(),
            "withAutoSchemaUpdate only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
        checkArgument(
            getBigLakeConfiguration() == null,
            "bigLakeConfiguration is only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
      } else {
        if (getWriteDisposition() == WriteDisposition.WRITE_TRUNCATE) {
          LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
        }
        if (getBigLakeConfiguration() != null) {
          checkArgument(
              Arrays.stream(new String[] {CONNECTION_ID, STORAGE_URI})
                  .allMatch(getBigLakeConfiguration()::containsKey),
              String.format(
                  "bigLakeConfiguration must contain keys '%s' and '%s'",
                  CONNECTION_ID, STORAGE_URI));
        }
      }
      if (getRowMutationInformationFn() != null) {
        checkArgument(
            getMethod() == Method.STORAGE_API_AT_LEAST_ONCE,
            "When using row updates on BigQuery, StorageWrite API should execute using"
                + " \"at least once\" mode.");
        checkArgument(
            getCreateDisposition() == CreateDisposition.CREATE_NEVER || getPrimaryKey() != null,
            "If specifying CREATE_IF_NEEDED along with row updates, a primary key needs to be specified");
      }
      if (getPrimaryKey() != null) {
        checkArgument(
            getMethod() != Method.FILE_LOADS, "Primary key not supported when using FILE_LOADS");
      }

      if (getAutoSchemaUpdate()) {
        // TODO(reuvenlax): Remove this restriction once we implement support.
        checkArgument(
            getIgnoreUnknownValues(),
            "Auto schema update currently only supported when ignoreUnknownValues also set.");
        checkArgument(
            !getUseBeamSchema(), "Auto schema update not supported when using Beam schemas.");
      }

      if (getJsonTimePartitioning() != null) {
        checkArgument(
            getDynamicDestinations() == null,
            "The supplied DynamicDestinations object can directly set TimePartitioning."
                + " There is no need to call BigQueryIO.Write.withTimePartitioning.");
        checkArgument(
            getTableFunction() == null,
            "The supplied getTableFunction object can directly set TimePartitioning."
                + " There is no need to call BigQueryIO.Write.withTimePartitioning.");
      }

      DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
      if (dynamicDestinations == null) {
        if (getJsonTableRef() != null) {
          dynamicDestinations =
              DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(
                  getJsonTableRef(), getTableDescription(), getJsonClustering() != null);
        } else if (getTableFunction() != null) {
          dynamicDestinations =
              new TableFunctionDestinations<>(getTableFunction(), getJsonClustering() != null);
        }

        // Wrap with a DynamicDestinations class that will provide a schema. There might be no
        // schema provided if the create disposition is CREATE_NEVER.
        if (getJsonSchema() != null) {
          dynamicDestinations =
              new ConstantSchemaDestinations<>(
                  (DynamicDestinations<T, TableDestination>) dynamicDestinations, getJsonSchema());
        } else if (getSchemaFromView() != null) {
          dynamicDestinations =
              new SchemaFromViewDestinations<>(
                  (DynamicDestinations<T, TableDestination>) dynamicDestinations,
                  getSchemaFromView());
        }

        // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
        if (getJsonTimePartitioning() != null || (getJsonClustering() != null)) {
          dynamicDestinations =
              new ConstantTimePartitioningClusteringDestinations<>(
                  (DynamicDestinations<T, TableDestination>) dynamicDestinations,
                  getJsonTimePartitioning(),
                  getJsonClustering());
        }
        if (getPrimaryKey() != null) {
          dynamicDestinations =
              new DynamicDestinationsHelpers.ConstantTableConstraintsDestinations<>(
                  (DynamicDestinations<T, TableDestination>) dynamicDestinations,
                  new TableConstraints()
                      .setPrimaryKey(
                          new TableConstraints.PrimaryKey().setColumns(getPrimaryKey())));
        }
      }
      return expandTyped(input, dynamicDestinations);
    }