v2/spanner-change-streams-to-sharded-file-sink/src/main/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFn.java [259:287]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private Map<String, Object> fetchSpannerRecord(
      String tableName,
      com.google.cloud.Timestamp commitTimestamp,
      String serverTxnId,
      JsonNode keysJson)
      throws Exception {
    com.google.cloud.Timestamp staleReadTs =
        com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
            commitTimestamp.getSeconds() - 1, commitTimestamp.getNanos());
    List<String> columns =
        ddl.table(tableName).columns().stream().map(Column::name).collect(Collectors.toList());
    // Stale read the spanner row for all the columns for timestamp 1 second less than the DELETE
    // event
    Struct row =
        spannerAccessor
            .getDatabaseClient()
            .singleUse(TimestampBound.ofReadTimestamp(staleReadTs))
            .readRow(tableName, generateKey(tableName, keysJson), columns);
    if (row == null) {
      throw new Exception(
          "stale read on Spanner returned null for table: "
              + tableName
              + ", commitTimestamp: "
              + commitTimestamp
              + " and serverTxnId:"
              + serverTxnId);
    }
    return getRowAsMap(row, columns, tableName);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFn.java [250:278]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private Map<String, Object> fetchSpannerRecord(
      String tableName,
      com.google.cloud.Timestamp commitTimestamp,
      String serverTxnId,
      JsonNode keysJson)
      throws Exception {
    com.google.cloud.Timestamp staleReadTs =
        com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
            commitTimestamp.getSeconds() - 1, commitTimestamp.getNanos());
    List<String> columns =
        ddl.table(tableName).columns().stream().map(Column::name).collect(Collectors.toList());
    // Stale read the spanner row for all the columns for timestamp 1 second less than the DELETE
    // event
    Struct row =
        spannerAccessor
            .getDatabaseClient()
            .singleUse(TimestampBound.ofReadTimestamp(staleReadTs))
            .readRow(tableName, generateKey(tableName, keysJson), columns);
    if (row == null) {
      throw new Exception(
          "stale read on Spanner returned null for table: "
              + tableName
              + ", commitTimestamp: "
              + commitTimestamp
              + " and serverTxnId:"
              + serverTxnId);
    }
    return getRowAsMap(row, columns, tableName);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



