in projects/dataflow-gcs-avro-to-spanner-scd/src/main/java/com/google/cloud/solutions/dataflow/avrotospannerscd/utils/StructHelper.java [36:447]
public record StructHelper(Struct struct) {
public static StructHelper of(Struct struct) {
return new StructHelper(struct);
}
/**
* Removes a field from the Struct.
*
* <p>If the field does not exist, ignores it.
*
* @param fieldName Field that should be removed.
* @return StructHelper with the updated Struct.
*/
public StructHelper withoutField(String fieldName) {
Struct.Builder recordBuilder = makeStructBuilderWithoutField(fieldName);
return StructHelper.of(recordBuilder.build());
}
/**
* Adds or updates a field in the Struct.
*
* <p>If the field exist, updates it. Otherwise, it adds it.
*
* @param fieldName Field that should be added or updated.
* @return StructHelper with the updated Struct.
*/
public StructHelper withUpdatedFieldValue(String fieldName, Value value) {
Struct.Builder recordBuilder = Struct.newBuilder();
AtomicBoolean updatedField = new AtomicBoolean(false);
struct.getType().getStructFields().stream()
.forEach(
field -> {
recordBuilder
.set(field.getName())
.to(field.getName().equals(fieldName) ? value : struct.getValue(field.getName()));
if (field.getName().equals(fieldName)) {
updatedField.set(true);
}
});
// The field may not already exist in the Struct. If that's the case, we add it as a new field.
if (!updatedField.get()) {
recordBuilder.set(fieldName).to(value);
}
return StructHelper.of(recordBuilder.build());
}
private Struct.Builder makeStructBuilderWithoutField(String fieldName) {
Struct.Builder recordBuilder = Struct.newBuilder();
struct.getType().getStructFields().stream()
.filter(field -> !field.getName().equals(fieldName))
.forEach(field -> recordBuilder.set(field.getName()).to(struct.getValue(field.getName())));
return recordBuilder;
}
public KeyMaker keyMaker(Iterable<String> primaryKeyColumnNames) {
return new KeyMaker(primaryKeyColumnNames);
}
public MutationCreator mutationCreator(String tableName, Iterable<String> primaryKeyColumnNames) {
return new MutationCreator(tableName, primaryKeyColumnNames);
}
/** Creates Keys for Structs. */
public final class KeyMaker {
private final Iterable<String> primaryKeyColumnNames;
/**
* Initializes KeyMaker with primary key column names, which will be added to the key.
*
* @param primaryKeyColumnNames List of primary key column names.
*/
private KeyMaker(Iterable<String> primaryKeyColumnNames) {
this.primaryKeyColumnNames = primaryKeyColumnNames;
}
/**
* Generates the Key for a given record using the primary key column names.
*
* @return Primary Key for the record.
*/
public Key createKey() {
Key.Builder keyBuilder = Key.newBuilder();
addRecordFieldsToKeyBuilder(primaryKeyColumnNames, keyBuilder);
return keyBuilder.build();
}
/**
* Creates the Key and casts to string format.
*
* <p>Useful in cases where there is need for a deterministic coder. Key does not provide this
* guarantee.
*
* @return Record Key in string format.
*/
public String createKeyString() {
return createKey().toString();
}
/**
* Adds struct values to the Key builder for the requested column names.
*
* <p>Used to generate Keys for records.
*
* @param columnNames to add to the Key.
* @param keyBuilder Key Builder where key will be created.
*/
private void addRecordFieldsToKeyBuilder(Iterable<String> columnNames, Key.Builder keyBuilder) {
HashMap<String, StructField> structFieldMap = new HashMap<>();
struct()
.getType()
.getStructFields()
.forEach(field -> structFieldMap.put(field.getName(), field));
columnNames.forEach(
columnName -> {
StructField field = structFieldMap.get(columnName);
if (field == null) {
throw new RuntimeException(
String.format(
"Primary key name %s not found in record. Unable to create Key.",
columnName));
}
Value fieldValue = struct().getValue(field.getName());
addValueToKeyBuilder(keyBuilder, fieldValue);
});
}
private void addValueToKeyBuilder(Key.Builder keyBuilder, Value fieldValue) {
Type fieldType = fieldValue.getType();
switch (fieldType.getCode()) {
case BOOL:
keyBuilder.append(ValueHelper.of(fieldValue).getBoolOrNull());
break;
case BYTES:
keyBuilder.append(ValueHelper.of(fieldValue).getBytesOrNull());
break;
case DATE:
keyBuilder.append(ValueHelper.of(fieldValue).getDateOrNull());
break;
case FLOAT32:
keyBuilder.append(ValueHelper.of(fieldValue).getFloat32OrNull());
break;
case FLOAT64:
keyBuilder.append(ValueHelper.of(fieldValue).getFloat64OrNull());
break;
case INT64:
keyBuilder.append(ValueHelper.of(fieldValue).getInt64OrNull());
break;
case JSON:
keyBuilder.append(ValueHelper.of(fieldValue).getJsonOrNull());
break;
case NUMERIC:
case PG_NUMERIC:
keyBuilder.append(ValueHelper.of(fieldValue).getNumericOrNull());
break;
case PG_JSONB:
keyBuilder.append(ValueHelper.of(fieldValue).getPgJsonbOrNull());
break;
case STRING:
keyBuilder.append(ValueHelper.of(fieldValue).getStringOrNull());
break;
case TIMESTAMP:
keyBuilder.append(ValueHelper.of(fieldValue).getTimestampOrNull());
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported Spanner field type %s.", fieldType.getCode()));
}
}
}
/** Value types with null values. */
public static class NullValues {
public static final Boolean NULL_BOOLEAN = null;
public static final ByteArray NULL_BYTES = null;
public static final Date NULL_DATE = null;
public static final Float NULL_FLOAT32 = null;
public static final Double NULL_FLOAT64 = null;
public static final Integer NULL_INT32 = null;
public static final Long NULL_INT64 = null;
public static final String NULL_JSON = null;
public static final BigDecimal NULL_NUMERIC = null;
public static final String NULL_STRING = null;
public static final Timestamp NULL_TIMESTAMP = null;
}
/** Provides functionality to get and work with Values for Structs. */
private record ValueHelper(Value value) {
/**
* Initializes ValueHelper with a Value.
*
* @param value Value for which to create ValueHelper.
*/
static ValueHelper of(Value value) {
return new ValueHelper(value);
}
Boolean getBoolOrNull() {
return value.isNull()
? StructHelper.NullValues.NULL_BOOLEAN
: Boolean.valueOf(value.getBool());
}
ByteArray getBytesOrNull() {
return value.isNull() ? StructHelper.NullValues.NULL_BYTES : value.getBytes();
}
Date getDateOrNull() {
return value.isNull() ? StructHelper.NullValues.NULL_DATE : value.getDate();
}
Float getFloat32OrNull() {
return value.isNull()
? StructHelper.NullValues.NULL_FLOAT32
: Float.valueOf(value.getFloat32());
}
Double getFloat64OrNull() {
return value.isNull()
? StructHelper.NullValues.NULL_FLOAT64
: Double.valueOf(value.getFloat64());
}
Long getInt64OrNull() {
return value.isNull() ? StructHelper.NullValues.NULL_INT64 : Long.valueOf(value.getInt64());
}
String getJsonOrNull() {
return value.isNull() ? StructHelper.NullValues.NULL_JSON : value.getJson();
}
BigDecimal getNumericOrNull() {
return value.isNull() ? StructHelper.NullValues.NULL_NUMERIC : value.getNumeric();
}
String getPgJsonbOrNull() {
return value.isNull() ? StructHelper.NullValues.NULL_JSON : value.getPgJsonb();
}
String getStringOrNull() {
return value.isNull() ? StructHelper.NullValues.NULL_STRING : value.getString();
}
Timestamp getTimestampOrNull() {
return value.isNull() ? StructHelper.NullValues.NULL_TIMESTAMP : value.getTimestamp();
}
}
/**
* Compares two Spanner Structs based on the data type of the selected column.
*
* <p>In the context of this Comparator, a Struct is a Spanner record - i.e. a collection of
* columns and their values. One of those columns will be used to compare the records.
*/
public static final class StructComparator implements Comparator<Struct>, Serializable {
private final String orderByColumnName;
private final SortOrder sortOrder;
public enum SortOrder {
ASC,
DESC;
}
private StructComparator(String orderByColumnName, SortOrder sortOrder) {
this.orderByColumnName = orderByColumnName;
this.sortOrder = sortOrder;
}
/** Returns a new StructComparator that compares orderByColumnName in ASC order. */
public static StructComparator create(String orderByColumnName) {
return create(orderByColumnName, SortOrder.ASC);
}
/** Returns a new StructComparator that compares orderByColumnName in given sort order. */
public static StructComparator create(String orderByColumnName, @Nullable SortOrder sortOrder) {
if (sortOrder == null) {
return StructComparator.create(orderByColumnName);
}
return new StructComparator(orderByColumnName, sortOrder);
}
/** Creates the StructComparator, which will return the reverse of the natural ordering. */
@Override
public Comparator<Struct> reversed() {
return new StructComparator(this.orderByColumnName, SortOrder.DESC);
}
/**
* {@inheritDoc}
*
* <p>Nulls are sorted first, following SQL behaviour.
*
* <p>Order is reversed when called with reversed().
*/
@Override
public int compare(Struct leftStruct, Struct rightStruct) {
return naturalCompare(leftStruct, rightStruct) * (sortOrder.equals(SortOrder.DESC) ? -1 : 1);
}
private int naturalCompare(Struct leftStruct, Struct rightStruct) {
Value leftValue = leftStruct.getValue(orderByColumnName);
Value rightValue = rightStruct.getValue(orderByColumnName);
// Sort NULL first (ASC), following SQL behaviour.
if (leftValue.isNull() && rightValue.isNull()) {
return 0;
} else if (leftValue.isNull()) {
return -1;
} else if (rightValue.isNull()) {
return 1;
}
return switch (leftValue.getType().getCode()) {
case BOOL ->
ValueHelper.of(leftValue)
.getBoolOrNull()
.compareTo(ValueHelper.of(rightValue).getBoolOrNull());
case BYTES ->
throw new RuntimeException(
String.format("Unable to sort by ByteArray field %s.", orderByColumnName));
case DATE ->
ValueHelper.of(leftValue)
.getDateOrNull()
.compareTo(ValueHelper.of(rightValue).getDateOrNull());
case FLOAT32 ->
ValueHelper.of(leftValue)
.getFloat32OrNull()
.compareTo(ValueHelper.of(rightValue).getFloat32OrNull());
case FLOAT64 ->
ValueHelper.of(leftValue)
.getFloat64OrNull()
.compareTo(ValueHelper.of(rightValue).getFloat64OrNull());
case INT64 ->
ValueHelper.of(leftValue)
.getInt64OrNull()
.compareTo(ValueHelper.of(rightValue).getInt64OrNull());
case JSON ->
ValueHelper.of(leftValue)
.getJsonOrNull()
.compareTo(ValueHelper.of(rightValue).getJsonOrNull());
case NUMERIC, PG_NUMERIC ->
ValueHelper.of(leftValue)
.getNumericOrNull()
.compareTo(ValueHelper.of(rightValue).getNumericOrNull());
case PG_JSONB ->
ValueHelper.of(leftValue)
.getPgJsonbOrNull()
.compareTo(ValueHelper.of(rightValue).getPgJsonbOrNull());
case STRING ->
ValueHelper.of(leftValue)
.getStringOrNull()
.compareTo(ValueHelper.of(rightValue).getStringOrNull());
case TIMESTAMP ->
ValueHelper.of(leftValue)
.getTimestampOrNull()
.compareTo(ValueHelper.of(rightValue).getTimestampOrNull());
default ->
throw new UnsupportedOperationException(
String.format("Unsupported Spanner field type %s.", leftValue.getType().getCode()));
};
}
}
/** Creates Spanner mutations for the Struct and given tables. */
public final class MutationCreator {
private String tableName;
private Iterable<String> primaryKeyColumnNames;
private MutationCreator(String tableName, Iterable<String> primaryKeyColumnNames) {
this.tableName = tableName;
this.primaryKeyColumnNames = primaryKeyColumnNames;
}
/** Creates an insert mutation for the given Struct and table name. */
public Mutation createInsertMutation() {
Mutation.WriteBuilder insertMutationBuilder = Mutation.newInsertBuilder(tableName);
struct
.getType()
.getStructFields()
.forEach(
field ->
insertMutationBuilder.set(field.getName()).to(struct.getValue(field.getName())));
return insertMutationBuilder.build();
}
/** Creates an upsert (insertOrUpdate) mutation for the given Struct and table name. */
public Mutation createUpsertMutation() {
Mutation.WriteBuilder upsertMutationBuilder = Mutation.newInsertOrUpdateBuilder(tableName);
struct
.getType()
.getStructFields()
.forEach(
field ->
upsertMutationBuilder.set(field.getName()).to(struct.getValue(field.getName())));
return upsertMutationBuilder.build();
}
/** Creates a deletion mutation for the existing given Struct and table name. */
public Mutation createDeleteMutation() {
Key recordKey = StructHelper.of(struct).keyMaker(primaryKeyColumnNames).createKey();
return Mutation.delete(tableName, recordKey);
}
}
}