in services/library/src/main/java/com/google/cloud/pso/bq_pii_classifier/services/pubsub/PubSubServiceImpl.java [36:71]
public PubSubPublishResults publishTableOperationRequests(String projectId, String topicId, List<JsonMessage> messages)
throws IOException, InterruptedException {
List<SuccessPubSubMessage> successMessages = new ArrayList<>();
List<FailedPubSubMessage> failedMessages = new ArrayList<>();
Publisher publisher = null;
try {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).build();
for (final JsonMessage msg : messages) {
ByteString data = ByteString.copyFromUtf8(msg.toJsonString());
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> future = publisher.publish(pubsubMessage);
try{
// wait and retrieves results
String messageId = future.get();
successMessages.add(new SuccessPubSubMessage(msg, messageId));
}catch (Exception ex){
failedMessages.add(new FailedPubSubMessage(msg, ex));
}
}
return new PubSubPublishResults(successMessages, failedMessages);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}