in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstovectorsearch/BigtableChangeStreamsToVectorSearch.java [93:217]
public static PipelineResult run(BigtableChangeStreamsToVectorSearchOptions options)
throws IOException {
options.setStreaming(true);
options.setEnableStreamingEngine(true);
List<String> experiments = options.getExperiments();
if (experiments == null) {
experiments = new ArrayList<>();
}
boolean hasUseRunnerV2 = false;
for (String experiment : experiments) {
if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
hasUseRunnerV2 = true;
break;
}
}
if (!hasUseRunnerV2) {
experiments.add(USE_RUNNER_V2_EXPERIMENT);
}
options.setExperiments(experiments);
Instant startTimestamp =
options.getBigtableChangeStreamStartTimestamp().isEmpty()
? Instant.now()
: toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));
String bigtableProjectId = getBigtableProjectId(options);
LOG.info(" - startTimestamp {}", startTimestamp);
LOG.info(" - bigtableReadInstanceId {}", options.getBigtableReadInstanceId());
LOG.info(" - bigtableReadTableId {}", options.getBigtableReadTableId());
LOG.info(" - bigtableChangeStreamAppProfile {}", options.getBigtableChangeStreamAppProfile());
LOG.info(" - embeddingColumn {}", options.getEmbeddingColumn());
LOG.info(" - crowdingTagColumn {}", options.getCrowdingTagColumn());
LOG.info(" - project {}", options.getProject());
LOG.info(" - indexName {}", options.getVectorSearchIndex());
String indexName = options.getVectorSearchIndex();
String vertexRegion = Utils.extractRegionFromIndexName(indexName);
String vertexEndpoint = vertexRegion + "-aiplatform.googleapis.com:443";
final Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
BigtableIO.ReadChangeStream readChangeStream =
BigtableIO.readChangeStream()
.withChangeStreamName(options.getBigtableChangeStreamName())
.withExistingPipelineOptions(
options.getBigtableChangeStreamResume()
? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
: BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
.withProjectId(bigtableProjectId)
.withAppProfileId(options.getBigtableChangeStreamAppProfile())
.withInstanceId(options.getBigtableReadInstanceId())
.withTableId(options.getBigtableReadTableId())
.withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
.withMetadataTableTableId(options.getBigtableMetadataTableTableId())
.withStartTime(startTimestamp);
PCollectionTuple results =
pipeline
.apply("Read from Cloud Bigtable Change Streams", readChangeStream)
.apply("Create Values", Values.create())
.apply(
"Converting to Vector Search Datapoints",
ParDo.of(
new ChangeStreamMutationToDatapointOperationFn(
options.getEmbeddingColumn(),
options.getEmbeddingByteSize(),
options.getCrowdingTagColumn(),
Utils.parseColumnMapping(options.getAllowRestrictsMappings()),
Utils.parseColumnMapping(options.getDenyRestrictsMappings()),
Utils.parseColumnMapping(options.getIntNumericRestrictsMappings()),
Utils.parseColumnMapping(options.getFloatNumericRestrictsMappings()),
Utils.parseColumnMapping(options.getDoubleNumericRestrictsMappings())))
.withOutputTags(
ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG,
TupleTagList.of(
ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)));
results
.get(ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG)
.apply("Add placeholer keys", WithKeys.of("placeholder"))
.apply(
"Batch Contents",
GroupIntoBatches.<String, IndexDatapoint>ofSize(
bufferSizeOption(options.getUpsertMaxBatchSize()))
.withMaxBufferingDuration(
bufferDurationOption(options.getUpsertMaxBufferDuration())))
.apply("Map to Values", Values.create())
.apply(
"Upsert Datapoints to VectorSearch",
ParDo.of(new UpsertDatapointsFn(vertexEndpoint, indexName)))
.apply(
"Write errors to DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
.withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
.setIncludePaneInfo(true)
.build());
results
.get(ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)
.apply("Add placeholder keys", WithKeys.of("placeholer"))
.apply(
"Batch Contents",
GroupIntoBatches.<String, String>ofSize(
bufferSizeOption(options.getDeleteMaxBatchSize()))
.withMaxBufferingDuration(
bufferDurationOption(options.getDeleteMaxBufferDuration())))
.apply("Map to Values", Values.create())
.apply(
"Remove Datapoints From VectorSearch",
ParDo.of(new RemoveDatapointsFn(vertexEndpoint, indexName)))
.apply(
"Write errors to DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
.withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
.setIncludePaneInfo(true)
.build());
return pipeline.run();
}