in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstopubsub/schemautils/PubSubUtils.java [54:160]
static {
FORMATTERS.put(
PubSubFields.ROW_KEY_STRING,
(pb, chg) -> {
String rowkeyEncoded = chg.getString(PubSubFields.ROW_KEY_BYTES.name());
return pb.convertBase64ToString(rowkeyEncoded);
});
FORMATTERS.put(
PubSubFields.ROW_KEY_STRING_BASE64,
(pb, chg) -> chg.getString(PubSubFields.ROW_KEY_BYTES.name()));
FORMATTERS.put(
PubSubFields.ROW_KEY_BYTES,
(pb, chg) -> {
String rowkeyEncoded = chg.getString(PubSubFields.ROW_KEY_BYTES.name());
return pb.convertBase64ToBytes(rowkeyEncoded);
});
FORMATTERS.put(PubSubFields.MOD_TYPE, (pb, chg) -> chg.getString(PubSubFields.MOD_TYPE.name()));
FORMATTERS.put(
PubSubFields.COMMIT_TIMESTAMP,
(pb, chg) -> chg.getLong(PubSubFields.COMMIT_TIMESTAMP.name()));
FORMATTERS.put(
PubSubFields.COLUMN_FAMILY, (pb, chg) -> chg.getString(PubSubFields.COLUMN_FAMILY.name()));
FORMATTERS.put(
PubSubFields.COLUMN_BYTES,
(pb, chg) -> {
if (!chg.has(PubSubFields.COLUMN_BYTES.name())) {
return null;
}
String qualifierEncoded = chg.getString(PubSubFields.COLUMN_BYTES.name());
return pb.convertBase64ToBytes(qualifierEncoded);
});
FORMATTERS.put(
PubSubFields.COLUMN_STRING,
(pb, chg) -> {
if (!chg.has(PubSubFields.COLUMN_BYTES.name())) {
return null;
}
String qualifierEncoded = chg.getString(PubSubFields.COLUMN_BYTES.name());
return pb.convertBase64ToString(qualifierEncoded);
});
FORMATTERS.put(
(PubSubFields.COLUMN_STRING_BASE64),
(pb, chg) -> chg.getString(PubSubFields.COLUMN_BYTES.name()));
FORMATTERS.put(
PubSubFields.TIMESTAMP,
(pb, chg) -> {
if (!chg.has(PubSubFields.TIMESTAMP.name())) {
return null;
}
return chg.getLong(PubSubFields.TIMESTAMP.name());
});
FORMATTERS.put(
PubSubFields.VALUE_BYTES,
(pb, chg) -> {
if (!chg.has(PubSubFields.VALUE_BYTES.name())) {
return null;
}
String valueEncoded = chg.getString(PubSubFields.VALUE_BYTES.name());
return pb.convertBase64ToBytes(valueEncoded);
});
FORMATTERS.put(
PubSubFields.VALUE_STRING,
(pb, chg) -> {
if (!chg.has(PubSubFields.VALUE_BYTES.name())) {
return null;
}
String valueEncoded = chg.getString(PubSubFields.VALUE_BYTES.name());
return pb.convertBase64ToString(valueEncoded);
});
FORMATTERS.put(
PubSubFields.VALUE_STRING_BASE64,
(pb, chg) -> chg.getString(PubSubFields.VALUE_BYTES.name()));
FORMATTERS.put(
PubSubFields.TIMESTAMP_FROM,
(pb, chg) -> {
if (!chg.has(PubSubFields.TIMESTAMP_FROM.name())) {
return null;
}
return chg.getLong(PubSubFields.TIMESTAMP_FROM.name());
});
FORMATTERS.put(
PubSubFields.TIMESTAMP_TO,
(pb, chg) -> {
if (!chg.has(PubSubFields.TIMESTAMP_TO.name())) {
return null;
}
return chg.getLong(PubSubFields.TIMESTAMP_TO.name());
});
FORMATTERS.put(PubSubFields.IS_GC, (pb, chg) -> chg.getBoolean(PubSubFields.IS_GC.name()));
FORMATTERS.put(
PubSubFields.SOURCE_INSTANCE,
(pb, chg) -> chg.getString(PubSubFields.SOURCE_INSTANCE.name()));
FORMATTERS.put(
PubSubFields.SOURCE_CLUSTER,
(pb, chg) -> chg.getString(PubSubFields.SOURCE_CLUSTER.name()));
FORMATTERS.put(
PubSubFields.SOURCE_TABLE, (pb, chg) -> chg.getString(PubSubFields.SOURCE_TABLE.name()));
FORMATTERS.put(
PubSubFields.TIEBREAKER, (pb, chg) -> chg.getInt(PubSubFields.TIEBREAKER.name()));
// Just in case, validate that every column in the enum has a formatter
for (PubSubFields field : PubSubFields.values()) {
Validate.notNull(FORMATTERS.get(field));
}
}