public Write fromConfigRow()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java [617:927]


    public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
      String updateCompatibilityBeamVersion =
          options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
      // We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
      // is not correctly passed in for pipelines that use Beam 2.53.0.
      // This is fixed for Beam 2.54.0 and later.
      updateCompatibilityBeamVersion =
          (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0";

      try {
        BigQueryIO.Write.Builder builder = new AutoValue_BigQueryIO_Write.Builder<>();

        String jsonTableRef = configRow.getString("json_table_ref");
        if (jsonTableRef != null) {
          builder = builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
        }
        byte[] tableFunctionBytes = configRow.getBytes("table_function");
        if (tableFunctionBytes != null) {
          builder =
              builder.setTableFunction(
                  (SerializableFunction<ValueInSingleWindow, TableDestination>)
                      fromByteArray(tableFunctionBytes));
        }
        byte[] formatFunctionBytes = configRow.getBytes("format_function");
        if (formatFunctionBytes != null) {
          builder =
              builder.setFormatFunction(
                  (SerializableFunction<?, TableRow>) fromByteArray(formatFunctionBytes));
        }
        byte[] formatRecordOnFailureFunctionBytes =
            configRow.getBytes("format_record_on_failure_function");
        if (formatRecordOnFailureFunctionBytes != null) {
          builder =
              builder.setFormatRecordOnFailureFunction(
                  (SerializableFunction<?, TableRow>)
                      fromByteArray(formatRecordOnFailureFunctionBytes));
        }
        byte[] avroRowWriterFactoryBytes = configRow.getBytes("avro_row_writer_factory");
        if (avroRowWriterFactoryBytes != null) {
          builder =
              builder.setAvroRowWriterFactory(
                  (AvroRowWriterFactory) fromByteArray(avroRowWriterFactoryBytes));
        }
        byte[] avroSchemaFactoryBytes = configRow.getBytes("avro_schema_factory");
        if (avroSchemaFactoryBytes != null) {
          builder =
              builder.setAvroSchemaFactory(
                  (SerializableFunction) fromByteArray(avroSchemaFactoryBytes));
        }
        Boolean useAvroLogicalTypes = configRow.getBoolean("use_avro_logical_types");
        if (useAvroLogicalTypes != null) {
          builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
        }
        byte[] dynamicDestinationsBytes = configRow.getBytes("dynamic_destinations");
        if (dynamicDestinationsBytes != null) {
          builder =
              builder.setDynamicDestinations(
                  (DynamicDestinations) fromByteArray(dynamicDestinationsBytes));
        }
        String jsonSchema = configRow.getString("json_schema");
        if (jsonSchema != null) {
          builder = builder.setJsonSchema(StaticValueProvider.of(jsonSchema));
        }
        String jsonTimePartitioning = configRow.getString("json_time_partitioning");
        if (jsonTimePartitioning != null) {
          builder = builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning));
        }
        // Translation with Clustering is broken before 2.56.0, where we used to attempt to
        // serialize a non-serializable Clustering object to bytes.
        // In 2.56.0 onwards, we translate using the json string representation instead.
        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") >= 0) {
          String jsonClustering = configRow.getString("clustering");
          if (jsonClustering != null) {
            builder = builder.setJsonClustering(StaticValueProvider.of(jsonClustering));
          }
        }
        byte[] createDispositionBytes = configRow.getBytes("create_disposition");
        if (createDispositionBytes != null) {
          builder =
              builder.setCreateDisposition(
                  (CreateDisposition) fromByteArray(createDispositionBytes));
        }
        byte[] writeDispositionBytes = configRow.getBytes("write_disposition");
        if (writeDispositionBytes != null) {
          builder =
              builder.setWriteDisposition((WriteDisposition) fromByteArray(writeDispositionBytes));
        }
        Collection<byte[]> schemaUpdateOptionsData = configRow.getArray("schema_update_options");
        if (schemaUpdateOptionsData != null) {
          Set<SchemaUpdateOption> schemaUpdateOptions =
              schemaUpdateOptionsData.stream()
                  .map(
                      data -> {
                        try {
                          return (SchemaUpdateOption) fromByteArray(data);
                        } catch (InvalidClassException e) {
                          throw new RuntimeException(e);
                        }
                      })
                  .collect(Collectors.toSet());
          builder = builder.setSchemaUpdateOptions(schemaUpdateOptions);
        } else {
          // This property is not nullable.
          builder = builder.setSchemaUpdateOptions(Collections.emptySet());
        }
        String tableDescription = configRow.getString("table_description");
        if (tableDescription != null) {
          builder = builder.setTableDescription(tableDescription);
        }
        Map<String, String> biglakeConfiguration = configRow.getMap("biglake_configuration");
        if (biglakeConfiguration != null) {
          builder = builder.setBigLakeConfiguration(biglakeConfiguration);
        }
        Boolean validate = configRow.getBoolean("validate");
        if (validate != null) {
          builder = builder.setValidate(validate);
        }
        byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
        if (bigqueryServicesBytes != null) {
          try {
            builder =
                builder.setBigQueryServices(
                    (BigQueryServices) fromByteArray(bigqueryServicesBytes));
          } catch (InvalidClassException e) {
            LOG.warn(
                "Could not use the provided `BigQueryServices` implementation when upgrading."
                    + "Using the default.");
            builder.setBigQueryServices(new BigQueryServicesImpl());
          }
        }
        Integer maxFilesPerBundle = configRow.getInt32("max_files_per_bundle");
        if (maxFilesPerBundle != null) {
          builder = builder.setMaxFilesPerBundle(maxFilesPerBundle);
        }
        Long maxFileSize = configRow.getInt64("max_file_size");
        if (maxFileSize != null) {
          builder = builder.setMaxFileSize(maxFileSize);
        }
        Integer numFileShards = configRow.getInt32("num_file_shards");
        if (numFileShards != null) {
          builder = builder.setNumFileShards(numFileShards);
        }
        Integer numStorageWriteApiStreams = configRow.getInt32("num_storage_write_api_streams");
        if (numStorageWriteApiStreams != null) {
          builder = builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams);
        }

        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.60.0") >= 0) {
          Boolean propagateSuccessfulStorageApiWrites =
              configRow.getBoolean("propagate_successful_storage_api_writes");
          if (propagateSuccessfulStorageApiWrites != null) {
            builder =
                builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
          }

          byte[] predicate =
              configRow.getBytes("propagate_successful_storage_api_writes_predicate");
          if (predicate != null) {
            builder =
                builder.setPropagateSuccessfulStorageApiWritesPredicate(
                    (Predicate<String>) fromByteArray(predicate));
          }
        } else {
          builder.setPropagateSuccessfulStorageApiWrites(false);
          builder.setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue());
        }

        Integer maxFilesPerPartition = configRow.getInt32("max_files_per_partition");
        if (maxFilesPerPartition != null) {
          builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
        }
        Long maxBytesPerPartition = configRow.getInt64("max_bytes_per_partition");
        if (maxBytesPerPartition != null) {
          builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
        }

        // We need to update the 'triggerring_frequency' field name for pipelines that are upgraded
        // from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785.
        // This is fixed for Beam 2.54.0 and later.
        String triggeringFrequencyFieldName =
            TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.53.0") == 0
                ? "triggerring_frequency"
                : "triggering_frequency";

        Duration triggeringFrequency = configRow.getValue(triggeringFrequencyFieldName);
        if (triggeringFrequency != null) {
          builder =
              builder.setTriggeringFrequency(
                  org.joda.time.Duration.millis(triggeringFrequency.toMillis()));
        }
        byte[] methodBytes = configRow.getBytes("method");
        if (methodBytes != null) {
          builder = builder.setMethod((Write.Method) fromByteArray(methodBytes));
        }
        String loadJobProjectId = configRow.getString("load_job_project_id");
        if (loadJobProjectId != null) {
          builder = builder.setLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));
        }
        byte[] failedInsertRetryPolicyBytes = configRow.getBytes("failed_insert_retry_policy");
        if (failedInsertRetryPolicyBytes != null) {
          builder =
              builder.setFailedInsertRetryPolicy(
                  (InsertRetryPolicy) fromByteArray(failedInsertRetryPolicyBytes));
        }
        String customGcsTempLocations = configRow.getString("custom_gcs_temp_location");
        if (customGcsTempLocations != null) {
          builder =
              builder.setCustomGcsTempLocation(StaticValueProvider.of(customGcsTempLocations));
        }
        Boolean extendedErrorInfo = configRow.getBoolean("extended_error_info");
        if (extendedErrorInfo != null) {
          builder = builder.setExtendedErrorInfo(extendedErrorInfo);
        }
        Boolean skipInvalidRows = configRow.getBoolean("skip_invalid_rows");
        if (skipInvalidRows != null) {
          builder = builder.setSkipInvalidRows(skipInvalidRows);
        }
        Boolean ignoreUnknownValues = configRow.getBoolean("ignore_unknown_values");
        if (ignoreUnknownValues != null) {
          builder = builder.setIgnoreUnknownValues(ignoreUnknownValues);
        }
        Boolean ignoreInsertIds = configRow.getBoolean("ignore_insert_ids");
        if (ignoreInsertIds != null) {
          builder = builder.setIgnoreInsertIds(ignoreInsertIds);
        }
        Integer maxRetryJobs = configRow.getInt32("max_retry_jobs");
        if (maxRetryJobs != null) {
          builder = builder.setMaxRetryJobs(maxRetryJobs);
        }
        String kmsKey = configRow.getString("kms_key");
        if (kmsKey != null) {
          builder = builder.setKmsKey(kmsKey);
        }
        Collection<String> primaryKey = configRow.getArray("primary_key");
        if (primaryKey != null && !primaryKey.isEmpty()) {
          builder = builder.setPrimaryKey(ImmutableList.of(primaryKey));
        }
        byte[] defaultMissingValueInterpretationsBytes =
            configRow.getBytes("default_missing_value_interpretation");
        if (defaultMissingValueInterpretationsBytes != null) {
          builder =
              builder.setDefaultMissingValueInterpretation(
                  (MissingValueInterpretation)
                      fromByteArray(defaultMissingValueInterpretationsBytes));
        }
        Boolean optimizeWrites = configRow.getBoolean("optimize_writes");
        if (optimizeWrites != null) {
          builder = builder.setOptimizeWrites(optimizeWrites);
        }
        Boolean useBeamSchema = configRow.getBoolean("use_beam_schema");
        if (useBeamSchema != null) {
          builder = builder.setUseBeamSchema(useBeamSchema);
        }
        Boolean autoSharding = configRow.getBoolean("auto_sharding");
        if (autoSharding != null) {
          builder = builder.setAutoSharding(autoSharding);
        }
        Boolean propagateSuccessful = configRow.getBoolean("propagate_successful");
        if (propagateSuccessful != null) {
          builder = builder.setPropagateSuccessful(propagateSuccessful);
        }
        Boolean autoSchemaUpdate = configRow.getBoolean("auto_schema_update");
        if (autoSchemaUpdate != null) {
          builder = builder.setAutoSchemaUpdate(autoSchemaUpdate);
        }
        byte[] writeProtosClasses = configRow.getBytes("write_protos_class");
        if (writeProtosClasses != null) {
          builder =
              builder.setWriteProtosClass(
                  (Class) fromByteArray(defaultMissingValueInterpretationsBytes));
        }
        Boolean directWriteProtos = configRow.getBoolean("direct_write_protos");
        if (directWriteProtos != null) {
          builder = builder.setDirectWriteProtos(directWriteProtos);
        }
        byte[] deterministicRecordIdFnBytes = configRow.getBytes("deterministic_record_id_fn");
        if (deterministicRecordIdFnBytes != null) {
          builder =
              builder.setDeterministicRecordIdFn(
                  (SerializableFunction) fromByteArray(deterministicRecordIdFnBytes));
        }
        String writeTempDataset = configRow.getString("write_temp_dataset");
        if (writeTempDataset != null) {
          builder = builder.setWriteTempDataset(writeTempDataset);
        }
        byte[] rowMutationInformationFnBytes = configRow.getBytes("row_mutation_information_fn");
        if (rowMutationInformationFnBytes != null) {
          builder =
              builder.setRowMutationInformationFn(
                  (SerializableFunction) fromByteArray(rowMutationInformationFnBytes));
        }

        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.55.0") < 0) {
          // We need to use defaults here for BQ rear/write transforms upgraded
          // from older Beam versions.
          // See https://github.com/apache/beam/issues/30534.
          builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
          builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>());
        } else {
          byte[] badRecordRouter = configRow.getBytes("bad_record_router");
          builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
          byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler");
          builder.setBadRecordErrorHandler(
              (ErrorHandler<BadRecord, ?>) fromByteArray(badRecordErrorHandler));
        }

        return builder.build();
      } catch (InvalidClassException e) {
        throw new RuntimeException(e);
      }
    }