in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java [225:381]
private TableRow modJsonStringToTableRow(String modJsonString) {
String deadLetterMessage =
"check dead letter queue for unprocessed records that failed to be processed";
ObjectNode modObjectNode = null;
try {
modObjectNode = (ObjectNode) new ObjectMapper().readTree(modJsonString);
} catch (JsonProcessingException e) {
String errorMessage =
String.format(
"error parsing modJsonString input into %s; %s",
ObjectNode.class, deadLetterMessage);
LOG.error(errorMessage);
throw new RuntimeException(errorMessage, e);
}
for (String excludeFieldName : BigQueryUtils.getBigQueryIntermediateMetadataFieldNames()) {
if (modObjectNode.has(excludeFieldName)) {
modObjectNode.remove(excludeFieldName);
}
}
Mod mod = null;
try {
mod = Mod.fromJson(modObjectNode.toString());
} catch (IOException e) {
String errorMessage =
String.format(
"error converting %s to %s; %s", ObjectNode.class, Mod.class, deadLetterMessage);
LOG.error(errorMessage);
throw new RuntimeException(errorMessage, e);
}
String spannerTableName = mod.getTableName();
TrackedSpannerTable spannerTable;
com.google.cloud.Timestamp spannerCommitTimestamp =
com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
mod.getCommitTimestampSeconds(), mod.getCommitTimestampNanos());
// Detect schema updates (newly added tables/columns) from mod and propagate changes into
// spannerTableByName which stores schema information by table name.
// Not able to get schema update from DELETE mods as they have empty newValuesJson.
if (mod.getModType() != ModType.DELETE) {
spannerTableByName =
SchemaUpdateUtils.updateStoredSchemaIfNeeded(
spannerAccessor, spannerChangeStream, dialect, mod, spannerTableByName);
}
try {
spannerTable = checkStateNotNull(spannerTableByName.get(spannerTableName));
} catch (IllegalStateException e) {
String errorMessage =
String.format(
"Can not find spanner table %s in spannerTableByName", spannerTableName);
LOG.error(errorMessage);
throw new RuntimeException(errorMessage, e);
}
// Set metadata fields of the tableRow.
TableRow tableRow = new TableRow();
BigQueryUtils.setMetadataFiledsOfTableRow(
spannerTableName,
mod,
modJsonString,
spannerCommitTimestamp,
tableRow,
useStorageWriteApi);
JSONObject keysJsonObject = new JSONObject(mod.getKeysJson());
// Set Spanner key columns of the tableRow.
SpannerToBigQueryUtils.addSpannerPkColumnsToTableRow(
keysJsonObject, spannerTable.getPkColumns(), tableRow);
// Set non-key columns of the tableRow.
SpannerToBigQueryUtils.addSpannerNonPkColumnsToTableRow(
mod.getNewValuesJson(), spannerTable.getNonPkColumns(), tableRow, mod.getModType());
// For "INSERT" mod, we can get all columns from mod.
// For "DELETE" mod, we only set the key columns. For all non-key columns, we already
// populated "null".
if (mod.getModType() == ModType.INSERT || mod.getModType() == ModType.DELETE) {
return tableRow;
}
// For "NEW_ROW" and "NEW_ROW_AND_OLD_VALUES" value capture types, we can get all columns
// from mod.
if (mod.getValueCaptureType() == ValueCaptureType.NEW_ROW
|| mod.getValueCaptureType() == ValueCaptureType.NEW_ROW_AND_OLD_VALUES) {
return tableRow;
}
// For "UPDATE" mod, the Mod only contains the changed columns, unchanged tracked columns
// are not included, so we need to do a snapshot read to Spanner to get the full row image
// tracked by change stream, we want to re-read the updated columns as well to get a
// consistent view of the whole row after the transaction is committed.
// Note that the read can fail if the database version retention period (default to be one
// hour) has passed the snapshot read timestamp, similar to other error cases, the pipeline
// will put the failed mod into the retry deadletter queue, and retry it for 5 times, and
// then eventually add the failed mod into the severe deadletter queue which won't be
// processed by the pipeline again, users should process the severe deadletter queue
// themselves.
Builder keyBuilder = com.google.cloud.spanner.Key.newBuilder();
for (TrackedSpannerColumn spannerColumn : spannerTable.getPkColumns()) {
String spannerColumnName = spannerColumn.getName();
if (keysJsonObject.has(spannerColumnName)) {
SpannerChangeStreamsUtils.appendToSpannerKey(spannerColumn, keysJsonObject, keyBuilder);
} else {
String errorMessage =
String.format(
"Caught exception when snapshot reading key column for UPDATE mod: Cannot find"
+ " value for key column %s",
spannerColumnName);
LOG.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
}
List<TrackedSpannerColumn> spannerNonPkColumns = spannerTable.getNonPkColumns();
List<String> spannerNonPkColumnNames =
spannerNonPkColumns.stream()
.map(spannerNonPkColumn -> spannerNonPkColumn.getName())
.collect(Collectors.toList());
int retryCount = 0;
while (true) {
try {
readSpannerRow(
spannerTable.getTableName(),
keyBuilder.build(),
spannerNonPkColumns,
spannerNonPkColumnNames,
spannerCommitTimestamp,
tableRow);
break;
} catch (Exception e) {
// Retry for maximum 3 times in case of transient error.
if (retryCount > 3) {
LOG.error("Caught exception from Spanner snapshot read: {}, throwing", e);
throw e;
} else {
LOG.error(
"Caught exception from Spanner snapshot read: {}, stack trace:{} current retry"
+ " count: {}",
e,
e.getStackTrace(),
retryCount);
// Wait for 1 seconds before next retry.
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
LOG.warn(
String.format("Caught %s during retry: %s", InterruptedException.class, ex));
}
retryCount++;
}
}
}
return tableRow;
}