in spanner-data-validator-java/src/main/java/com/google/migration/dofns/CustomTransformationDoFn.java [91:133]
public void processElement(ProcessContext c) {
SourceRecord sourceRecord = c.element();
try {
//if customTransformer is not null, then do the custom transformation.
if (customTransformer != null) {
Map<String, Object> sourceRecordMap = getSourceRecordMap(sourceRecord);
MigrationTransformationResponse migrationTransformationResponse = getCustomTransformationResponse(
sourceRecordMap, tableName(), shardId());
if (migrationTransformationResponse.isEventFiltered()) {
LOG.info("Row was filtered by custom transformer");
c.output(new HashResult());
}
Map<String, Object> transformedCols = migrationTransformationResponse.getResponseRow();
//There are two possible cases: The column under transformation is an existing one
//or a new one.
//If it is an existing one, the sourceRecord needs to be updated in place at the
//positional value of that column.
//If it's a new column, add the new columns from custom transformation to the end of the existing sourceRow
//Sort the transformedCols map by the field name and add it to the existing sourceRecord
Map<String, Object> sortedTransformedCols = new TreeMap<>(transformedCols);
for (String transformedColName: sortedTransformedCols.keySet()) {
boolean fieldExists = false;
for (int i = 0; i < sourceRecord.length(); i++) {
if (sourceRecord.getField(i).getFieldName().equals(transformedColName)) {
sourceRecord.setField(i, transformedColName, getDataTypeFromSchema(transformedColName), sortedTransformedCols.get(transformedColName));
fieldExists = true;
}
}
if (!fieldExists) sourceRecord.addField(transformedColName, getDataTypeFromSchema(transformedColName) ,sortedTransformedCols.get(transformedColName));
}
}
HashResult hashResult = HashResult.fromSourceRecord(sourceRecord,
keyIndex(),
rangeFieldType(),
adjustTimestampPrecision(),
timestampThresholdKeyIndex());
c.output(hashResult);
} catch (Exception e) {
LOG.error("Error while processing element: ", e);
transformerErrors.inc();
c.output(new HashResult());
}
}