public TypedRead fromConfigRow()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java [208:359]


    public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
      String updateCompatibilityBeamVersion =
          options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
      // We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
      // is not correctly passed in for pipelines that use Beam 2.53.0.
      // This is fixed for Beam 2.54.0 and later.
      updateCompatibilityBeamVersion =
          (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0";

      try {
        BigQueryIO.TypedRead.Builder builder = new AutoValue_BigQueryIO_TypedRead.Builder<>();

        String jsonTableRef = configRow.getString("json_table_ref");
        if (jsonTableRef != null) {
          builder = builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
        }
        String query = configRow.getString("query");
        if (query != null) {
          builder = builder.setQuery(StaticValueProvider.of(query));
        }
        Boolean validate = configRow.getBoolean("validate");
        if (validate != null) {
          builder = builder.setValidate(validate);
        }
        Boolean flattenResults = configRow.getBoolean("flatten_results");
        if (flattenResults != null) {
          builder = builder.setFlattenResults(flattenResults);
        }
        Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
        if (useLegacySQL != null) {
          builder = builder.setUseLegacySql(useLegacySQL);
        }
        Boolean withTemplateCompatibility = configRow.getBoolean("with_template_compatibility");
        if (withTemplateCompatibility != null) {
          builder = builder.setWithTemplateCompatibility(withTemplateCompatibility);
        }
        byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
        if (bigqueryServicesBytes != null) {
          try {
            builder =
                builder.setBigQueryServices(
                    (BigQueryServices) fromByteArray(bigqueryServicesBytes));
          } catch (InvalidClassException e) {
            LOG.warn(
                "Could not use the provided `BigQueryServices` implementation when upgrading."
                    + "Using the default.");
            builder.setBigQueryServices(new BigQueryServicesImpl());
          }
        }
        byte[] parseFnBytes = configRow.getBytes("parse_fn");
        if (parseFnBytes != null) {
          builder = builder.setParseFn((SerializableFunction) fromByteArray(parseFnBytes));
        }
        byte[] datumReaderFactoryBytes = configRow.getBytes("datum_reader_factory");
        if (datumReaderFactoryBytes != null) {
          builder =
              builder.setDatumReaderFactory(
                  (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
        }
        byte[] queryPriorityBytes = configRow.getBytes("query_priority");
        if (queryPriorityBytes != null) {
          builder = builder.setQueryPriority((QueryPriority) fromByteArray(queryPriorityBytes));
        }
        String queryLocation = configRow.getString("query_location");
        if (queryLocation != null) {
          builder = builder.setQueryLocation(queryLocation);
        }
        String queryTempDataset = configRow.getString("query_temp_dataset");
        if (queryTempDataset != null) {
          builder = builder.setQueryTempDataset(queryTempDataset);
        }

        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.57.0") >= 0) {
          // This property was added for Beam 2.57.0 hence not available when
          // upgrading the transform from previous Beam versions.
          String queryTempProject = configRow.getString("query_temp_project");
          if (queryTempProject != null) {
            builder = builder.setQueryTempProject(queryTempProject);
          }
        }

        byte[] methodBytes = configRow.getBytes("method");
        if (methodBytes != null) {
          builder = builder.setMethod((TypedRead.Method) fromByteArray(methodBytes));
        }
        byte[] formatBytes = configRow.getBytes("format");
        if (formatBytes != null) {
          builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
        }
        Collection<String> selectedFields = configRow.getArray("selected_fields");
        if (selectedFields != null && !selectedFields.isEmpty()) {
          builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
        }
        String rowRestriction = configRow.getString("row_restriction");
        if (rowRestriction != null) {
          builder = builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
        }
        byte[] coderBytes = configRow.getBytes("coder");
        if (coderBytes != null) {
          try {
            builder = builder.setCoder((Coder) fromByteArray(coderBytes));
          } catch (InvalidClassException e) {
            LOG.warn(
                "Could not use the provided `Coder` implementation when upgrading."
                    + "Using the default.");
          }
        }
        String kmsKey = configRow.getString("kms_key");
        if (kmsKey != null) {
          builder = builder.setKmsKey(kmsKey);
        }
        byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
        if (typeDescriptorBytes != null) {
          builder = builder.setTypeDescriptor((TypeDescriptor) fromByteArray(typeDescriptorBytes));
        }
        byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
        if (toBeamRowFnBytes != null) {
          builder = builder.setToBeamRowFn((ToBeamRowFunction) fromByteArray(toBeamRowFnBytes));
        }
        byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
        if (fromBeamRowFnBytes != null) {
          builder =
              builder.setFromBeamRowFn((FromBeamRowFunction) fromByteArray(fromBeamRowFnBytes));
        }
        Boolean useAvroLogicalTypes = configRow.getBoolean("use_avro_logical_types");
        if (useAvroLogicalTypes != null) {
          builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
        }
        Boolean projectionPushdownApplied = configRow.getBoolean("projection_pushdown_applied");
        if (projectionPushdownApplied != null) {
          builder = builder.setProjectionPushdownApplied(projectionPushdownApplied);
        }

        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.55.0") < 0) {
          // We need to use defaults here for BQ rear/write transforms upgraded
          // from older Beam versions.
          // See https://github.com/apache/beam/issues/30534.
          builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
          builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>());
        } else {
          byte[] badRecordRouter = configRow.getBytes("bad_record_router");
          builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
          byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler");
          builder.setBadRecordErrorHandler(
              (ErrorHandler<BadRecord, ?>) fromByteArray(badRecordErrorHandler));
        }

        return builder.build();
      } catch (InvalidClassException e) {
        throw new RuntimeException(e);
      }
    }