public static void runDVT()

in spanner-data-validator-java/src/main/java/com/google/migration/JDBCToSpannerDVTWithHash.java [811:890]


  public static void runDVT(DVTOptionsCore options) throws IOException {
    if (options.getGenerateTableSpec()) {
      String sessionFileJson = options.getSessionFileJson();
      if (Helpers.isNullOrEmpty(sessionFileJson)) {
        throw new RuntimeException("Session file needs to be provided to generate the tableSpec from it!");
      }
      List<TableSpec> tableSpecs = generateTableSpec(options);
      String jsonFileName = String.format("tableSpec-%s.json", System.currentTimeMillis());
      TableSpecList.toJsonFile(tableSpecs, jsonFileName);
      LOG.info("TableSpec has been written to {} file", jsonFileName);
      return;
    }

    Pipeline p = Pipeline.create(options);

    p.getCoderRegistry().registerCoderForClass(HashResult.class, AvroCoder.of(HashResult.class));

    if(Helpers.isNullOrEmpty(options.getRunName())) {
      DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm-ss");
      String timestampStr = DateTime.now().toString(formatter);
      options.setRunName(String.format("Run-%s", timestampStr));
    }

    CustomTransformation customTransformation = CustomTransformation
        .builder(options.getTransformationJarPath(), options.getTransformationClassName())
        .setCustomParameters(options.getTransformationCustomParameters())
        .build();

    if (!Helpers.isNullOrEmpty(options.getTransformationJarPath()) && !Helpers.isNullOrEmpty(options.getTransformationClassName()) && Helpers.isNullOrEmpty(options.getSessionFileJson())) {
      throw new RuntimeException("Custom transformations is only supported with session file integration. Please specify the session file and re-run the pipeline");
    }

    Schema schema = null;
    if (!Helpers.isNullOrEmpty(options.getSessionFileJson())) {
      schema = SessionFileReader.read(options.getSessionFileJson());
    }

    List<TableSpec> tableSpecs = generateTableSpec(options);

    PipelineTracker pipelineTracker = new PipelineTracker();
    pipelineTracker.setMaxTablesInEffectAtOneTime(options.getMaxTablesInEffectAtOneTime());

    for(TableSpec tableSpec: tableSpecs) {
      BigQueryIO.Write<ComparerResult> comparerResultWrite =
          getComparisonResultsBQWriter(options, tableSpec.getTableName());

      BigQueryIO.Write<HashResult> jdbcConflictingRecordsWriter = null;
      BigQueryIO.Write<HashResult> spannerConflictingRecordsWriter = null;

      String conflictingRecordsBQTableName = options.getConflictingRecordsBQTableName();
      if(!Helpers.isNullOrEmpty(conflictingRecordsBQTableName)) {
        LOG.info(String.format("*******Enabling writing of conflicting records to table %s",
            conflictingRecordsBQTableName));
        spannerConflictingRecordsWriter = getConflictingRecordsBQWriter(options,
            options.getRunName(),
            tableSpec.getTableName(),
            "spanner");

        jdbcConflictingRecordsWriter = getConflictingRecordsBQWriter(options,
            options.getRunName(),
            tableSpec.getTableName(),
            "jdbc");
      } else {
        LOG.info(String.format("*******NOT enabling writing of conflicting records! %s",
            conflictingRecordsBQTableName));
      }

      configureComparisonPipeline(p,
          pipelineTracker,
          options,
          tableSpec,
          comparerResultWrite,
          jdbcConflictingRecordsWriter,
          spannerConflictingRecordsWriter,
          customTransformation,
          schema);
    }

    p.run();
  }