public void processElement()

in spanner-data-validator-java/src/main/java/com/google/migration/dofns/CustomTransformationDoFn.java [91:133]


  public void processElement(ProcessContext c) {
    SourceRecord sourceRecord = c.element();
    try {
      //if customTransformer is not null, then do the custom transformation.
      if (customTransformer != null) {
        Map<String, Object> sourceRecordMap = getSourceRecordMap(sourceRecord);
        MigrationTransformationResponse migrationTransformationResponse = getCustomTransformationResponse(
            sourceRecordMap, tableName(), shardId());
        if (migrationTransformationResponse.isEventFiltered()) {
          LOG.info("Row was filtered by custom transformer");
          c.output(new HashResult());
        }
        Map<String, Object> transformedCols = migrationTransformationResponse.getResponseRow();
        //There are two possible cases: The column under transformation is an existing one
        //or a new one.
        //If it is an existing one, the sourceRecord needs to be updated in place at the
        //positional value of that column.
        //If it's a new column, add the new columns from custom transformation to the end of the existing sourceRow
        //Sort the transformedCols map by the field name and add it to the existing sourceRecord
        Map<String, Object> sortedTransformedCols = new TreeMap<>(transformedCols);
        for (String transformedColName: sortedTransformedCols.keySet()) {
          boolean fieldExists = false;
          for (int i = 0; i < sourceRecord.length(); i++) {
            if (sourceRecord.getField(i).getFieldName().equals(transformedColName)) {
              sourceRecord.setField(i, transformedColName, getDataTypeFromSchema(transformedColName), sortedTransformedCols.get(transformedColName));
              fieldExists = true;
            }
          }
          if (!fieldExists) sourceRecord.addField(transformedColName, getDataTypeFromSchema(transformedColName) ,sortedTransformedCols.get(transformedColName));
        }
      }
      HashResult hashResult = HashResult.fromSourceRecord(sourceRecord,
          keyIndex(),
          rangeFieldType(),
          adjustTimestampPrecision(),
          timestampThresholdKeyIndex());
      c.output(hashResult);
    } catch (Exception e) {
      LOG.error("Error while processing element: ", e);
      transformerErrors.inc();
      c.output(new HashResult());
    }
  }