in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigquery/schemautils/BigQueryUtils.java [48:169]
static {
FORMATTERS.put(
ChangelogColumn.ROW_KEY_STRING,
(bq, chg) -> {
String rowkeyEncoded = chg.getString(ChangelogColumn.ROW_KEY_BYTES.name());
return bq.convertBase64ToString(rowkeyEncoded);
});
FORMATTERS.put(
ChangelogColumn.ROW_KEY_BYTES,
(bq, chg) -> {
String rowkeyEncoded = chg.getString(ChangelogColumn.ROW_KEY_BYTES.name());
return bq.convertBase64ToBytes(rowkeyEncoded);
});
FORMATTERS.put(
ChangelogColumn.MOD_TYPE, (bq, chg) -> chg.getString(ChangelogColumn.MOD_TYPE.name()));
FORMATTERS.put(
ChangelogColumn.COMMIT_TIMESTAMP,
(bq, chg) -> chg.getString(ChangelogColumn.COMMIT_TIMESTAMP.name()));
FORMATTERS.put(
ChangelogColumn.COLUMN_FAMILY,
(bq, chg) -> chg.getString(ChangelogColumn.COLUMN_FAMILY.name()));
FORMATTERS.put(
ChangelogColumn.COLUMN,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.COLUMN.name())) {
return null;
}
String qualifierEncoded = chg.getString(ChangelogColumn.COLUMN.name());
return bq.convertBase64ToString(qualifierEncoded);
});
FORMATTERS.put(
ChangelogColumn.TIMESTAMP,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.TIMESTAMP.name())) {
return null;
}
return chg.getString(ChangelogColumn.TIMESTAMP.name());
});
FORMATTERS.put(
ChangelogColumn.TIMESTAMP_NUM,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.TIMESTAMP_NUM.name())) {
return null;
}
return chg.getString(ChangelogColumn.TIMESTAMP_NUM.name());
});
FORMATTERS.put(
ChangelogColumn.VALUE_STRING,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.VALUE_BYTES.name())) {
return null;
}
String valueEncoded = chg.getString(ChangelogColumn.VALUE_BYTES.name());
return bq.convertBase64ToString(valueEncoded);
});
FORMATTERS.put(
ChangelogColumn.VALUE_BYTES,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.VALUE_BYTES.name())) {
return null;
}
String valueEncoded = chg.getString(ChangelogColumn.VALUE_BYTES.name());
return bq.convertBase64ToBytes(valueEncoded);
});
FORMATTERS.put(
ChangelogColumn.TIMESTAMP_FROM,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.TIMESTAMP_FROM.name())) {
return null;
}
return chg.getString(ChangelogColumn.TIMESTAMP_FROM.name());
});
FORMATTERS.put(
ChangelogColumn.TIMESTAMP_FROM_NUM,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.TIMESTAMP_FROM_NUM.name())) {
return null;
}
return chg.getString(ChangelogColumn.TIMESTAMP_FROM_NUM.name());
});
FORMATTERS.put(
ChangelogColumn.TIMESTAMP_TO,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.TIMESTAMP_TO.name())) {
return null;
}
return chg.getString(ChangelogColumn.TIMESTAMP_TO.name());
});
FORMATTERS.put(
ChangelogColumn.TIMESTAMP_TO_NUM,
(bq, chg) -> {
if (!chg.has(ChangelogColumn.TIMESTAMP_TO_NUM.name())) {
return null;
}
return chg.getString(ChangelogColumn.TIMESTAMP_TO_NUM.name());
});
FORMATTERS.put(
ChangelogColumn.IS_GC,
(bq, chg) -> Boolean.toString(chg.getBoolean(ChangelogColumn.IS_GC.name())));
FORMATTERS.put(
ChangelogColumn.SOURCE_INSTANCE,
(bq, chg) -> chg.getString(ChangelogColumn.SOURCE_INSTANCE.name()));
FORMATTERS.put(
ChangelogColumn.SOURCE_CLUSTER,
(bq, chg) -> chg.getString(ChangelogColumn.SOURCE_CLUSTER.name()));
FORMATTERS.put(
ChangelogColumn.SOURCE_TABLE,
(bq, chg) -> chg.getString(ChangelogColumn.SOURCE_TABLE.name()));
FORMATTERS.put(
ChangelogColumn.TIEBREAKER,
(bq, chg) -> Long.toString(chg.getLong(ChangelogColumn.TIEBREAKER.name())));
FORMATTERS.put(ChangelogColumn.BQ_COMMIT_TIMESTAMP, (bq, chg) -> null);
// Just in case, validate that every column in the enum has a formatter
for (ChangelogColumn column : ChangelogColumn.values()) {
if (StringUtils.isBlank(column.getDefaultValueExpression())) {
Validate.notNull(FORMATTERS.get(column));
}
}
}