in spanner-data-validator-java/src/main/java/com/google/migration/JDBCToSpannerDVTWithHash.java [811:890]
public static void runDVT(DVTOptionsCore options) throws IOException {
if (options.getGenerateTableSpec()) {
String sessionFileJson = options.getSessionFileJson();
if (Helpers.isNullOrEmpty(sessionFileJson)) {
throw new RuntimeException("Session file needs to be provided to generate the tableSpec from it!");
}
List<TableSpec> tableSpecs = generateTableSpec(options);
String jsonFileName = String.format("tableSpec-%s.json", System.currentTimeMillis());
TableSpecList.toJsonFile(tableSpecs, jsonFileName);
LOG.info("TableSpec has been written to {} file", jsonFileName);
return;
}
Pipeline p = Pipeline.create(options);
p.getCoderRegistry().registerCoderForClass(HashResult.class, AvroCoder.of(HashResult.class));
if(Helpers.isNullOrEmpty(options.getRunName())) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm-ss");
String timestampStr = DateTime.now().toString(formatter);
options.setRunName(String.format("Run-%s", timestampStr));
}
CustomTransformation customTransformation = CustomTransformation
.builder(options.getTransformationJarPath(), options.getTransformationClassName())
.setCustomParameters(options.getTransformationCustomParameters())
.build();
if (!Helpers.isNullOrEmpty(options.getTransformationJarPath()) && !Helpers.isNullOrEmpty(options.getTransformationClassName()) && Helpers.isNullOrEmpty(options.getSessionFileJson())) {
throw new RuntimeException("Custom transformations is only supported with session file integration. Please specify the session file and re-run the pipeline");
}
Schema schema = null;
if (!Helpers.isNullOrEmpty(options.getSessionFileJson())) {
schema = SessionFileReader.read(options.getSessionFileJson());
}
List<TableSpec> tableSpecs = generateTableSpec(options);
PipelineTracker pipelineTracker = new PipelineTracker();
pipelineTracker.setMaxTablesInEffectAtOneTime(options.getMaxTablesInEffectAtOneTime());
for(TableSpec tableSpec: tableSpecs) {
BigQueryIO.Write<ComparerResult> comparerResultWrite =
getComparisonResultsBQWriter(options, tableSpec.getTableName());
BigQueryIO.Write<HashResult> jdbcConflictingRecordsWriter = null;
BigQueryIO.Write<HashResult> spannerConflictingRecordsWriter = null;
String conflictingRecordsBQTableName = options.getConflictingRecordsBQTableName();
if(!Helpers.isNullOrEmpty(conflictingRecordsBQTableName)) {
LOG.info(String.format("*******Enabling writing of conflicting records to table %s",
conflictingRecordsBQTableName));
spannerConflictingRecordsWriter = getConflictingRecordsBQWriter(options,
options.getRunName(),
tableSpec.getTableName(),
"spanner");
jdbcConflictingRecordsWriter = getConflictingRecordsBQWriter(options,
options.getRunName(),
tableSpec.getTableName(),
"jdbc");
} else {
LOG.info(String.format("*******NOT enabling writing of conflicting records! %s",
conflictingRecordsBQTableName));
}
configureComparisonPipeline(p,
pipelineTracker,
options,
tableSpec,
comparerResultWrite,
jdbcConflictingRecordsWriter,
spannerConflictingRecordsWriter,
customTransformation,
schema);
}
p.run();
}