private TableRow modJsonStringToTableRow()

in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java [225:381]


      private TableRow modJsonStringToTableRow(String modJsonString) {
        String deadLetterMessage =
            "check dead letter queue for unprocessed records that failed to be processed";
        ObjectNode modObjectNode = null;
        try {
          modObjectNode = (ObjectNode) new ObjectMapper().readTree(modJsonString);
        } catch (JsonProcessingException e) {
          String errorMessage =
              String.format(
                  "error parsing modJsonString input into %s; %s",
                  ObjectNode.class, deadLetterMessage);
          LOG.error(errorMessage);
          throw new RuntimeException(errorMessage, e);
        }
        for (String excludeFieldName : BigQueryUtils.getBigQueryIntermediateMetadataFieldNames()) {
          if (modObjectNode.has(excludeFieldName)) {
            modObjectNode.remove(excludeFieldName);
          }
        }

        Mod mod = null;
        try {
          mod = Mod.fromJson(modObjectNode.toString());
        } catch (IOException e) {
          String errorMessage =
              String.format(
                  "error converting %s to %s; %s", ObjectNode.class, Mod.class, deadLetterMessage);
          LOG.error(errorMessage);
          throw new RuntimeException(errorMessage, e);
        }
        String spannerTableName = mod.getTableName();
        TrackedSpannerTable spannerTable;
        com.google.cloud.Timestamp spannerCommitTimestamp =
            com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
                mod.getCommitTimestampSeconds(), mod.getCommitTimestampNanos());

        // Detect schema updates (newly added tables/columns) from mod and propagate changes into
        // spannerTableByName which stores schema information by table name.
        // Not able to get schema update from DELETE mods as they have empty newValuesJson.
        if (mod.getModType() != ModType.DELETE) {
          spannerTableByName =
              SchemaUpdateUtils.updateStoredSchemaIfNeeded(
                  spannerAccessor, spannerChangeStream, dialect, mod, spannerTableByName);
        }

        try {
          spannerTable = checkStateNotNull(spannerTableByName.get(spannerTableName));

        } catch (IllegalStateException e) {
          String errorMessage =
              String.format(
                  "Can not find spanner table %s in spannerTableByName", spannerTableName);
          LOG.error(errorMessage);
          throw new RuntimeException(errorMessage, e);
        }

        // Set metadata fields of the tableRow.
        TableRow tableRow = new TableRow();
        BigQueryUtils.setMetadataFiledsOfTableRow(
            spannerTableName,
            mod,
            modJsonString,
            spannerCommitTimestamp,
            tableRow,
            useStorageWriteApi);
        JSONObject keysJsonObject = new JSONObject(mod.getKeysJson());
        // Set Spanner key columns of the tableRow.
        SpannerToBigQueryUtils.addSpannerPkColumnsToTableRow(
            keysJsonObject, spannerTable.getPkColumns(), tableRow);

        // Set non-key columns of the tableRow.
        SpannerToBigQueryUtils.addSpannerNonPkColumnsToTableRow(
            mod.getNewValuesJson(), spannerTable.getNonPkColumns(), tableRow, mod.getModType());

        // For "INSERT" mod, we can get all columns from mod.
        // For "DELETE" mod, we only set the key columns. For all non-key columns, we already
        // populated "null".
        if (mod.getModType() == ModType.INSERT || mod.getModType() == ModType.DELETE) {
          return tableRow;
        }

        // For "NEW_ROW" and "NEW_ROW_AND_OLD_VALUES" value capture types, we can get all columns
        // from mod.
        if (mod.getValueCaptureType() == ValueCaptureType.NEW_ROW
            || mod.getValueCaptureType() == ValueCaptureType.NEW_ROW_AND_OLD_VALUES) {
          return tableRow;
        }

        // For "UPDATE" mod, the Mod only contains the changed columns, unchanged tracked columns
        // are not included, so we need to do a snapshot read to Spanner to get the full row image
        // tracked by change stream, we want to re-read the updated columns as well to get a
        // consistent view of the whole row after the transaction is committed.
        // Note that the read can fail if the database version retention period (default to be one
        // hour) has passed the snapshot read timestamp, similar to other error cases, the pipeline
        // will put the failed mod into the retry deadletter queue, and retry it for 5 times, and
        // then eventually add the failed mod into the severe deadletter queue which won't be
        // processed by the pipeline again, users should process the severe deadletter queue
        // themselves.
        Builder keyBuilder = com.google.cloud.spanner.Key.newBuilder();
        for (TrackedSpannerColumn spannerColumn : spannerTable.getPkColumns()) {
          String spannerColumnName = spannerColumn.getName();
          if (keysJsonObject.has(spannerColumnName)) {
            SpannerChangeStreamsUtils.appendToSpannerKey(spannerColumn, keysJsonObject, keyBuilder);
          } else {
            String errorMessage =
                String.format(
                    "Caught exception when snapshot reading key column for UPDATE mod: Cannot find"
                        + " value for key column %s",
                    spannerColumnName);
            LOG.error(errorMessage);
            throw new IllegalArgumentException(errorMessage);
          }
        }

        List<TrackedSpannerColumn> spannerNonPkColumns = spannerTable.getNonPkColumns();
        List<String> spannerNonPkColumnNames =
            spannerNonPkColumns.stream()
                .map(spannerNonPkColumn -> spannerNonPkColumn.getName())
                .collect(Collectors.toList());

        int retryCount = 0;
        while (true) {
          try {
            readSpannerRow(
                spannerTable.getTableName(),
                keyBuilder.build(),
                spannerNonPkColumns,
                spannerNonPkColumnNames,
                spannerCommitTimestamp,
                tableRow);
            break;
          } catch (Exception e) {
            // Retry for maximum 3 times in case of transient error.
            if (retryCount > 3) {
              LOG.error("Caught exception from Spanner snapshot read: {}, throwing", e);
              throw e;
            } else {
              LOG.error(
                  "Caught exception from Spanner snapshot read: {}, stack trace:{} current retry"
                      + " count: {}",
                  e,
                  e.getStackTrace(),
                  retryCount);
              // Wait for 1 seconds before next retry.
              try {
                TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException ex) {
                LOG.warn(
                    String.format("Caught %s during retry: %s", InterruptedException.class, ex));
              }
              retryCount++;
            }
          }
        }

        return tableRow;
      }