in dlp/snippets/src/main/java/dlp/snippets/RiskAnalysisKMap.java [67:202]
public static void calculateKMap(
String projectId, String datasetId, String tableId, String topicId, String subscriptionId)
throws ExecutionException, InterruptedException, IOException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
// Specify the BigQuery table to analyze
BigQueryTable bigQueryTable =
BigQueryTable.newBuilder()
.setProjectId(projectId)
.setDatasetId(datasetId)
.setTableId(tableId)
.build();
// These values represent the column names of quasi-identifiers to analyze
List<String> quasiIds = Arrays.asList("Age", "Gender");
// These values represent the info types corresponding to the quasi-identifiers above
List<String> infoTypeNames = Arrays.asList("AGE", "GENDER");
// Tag each of the quasiId column names with its corresponding infoType
List<InfoType> infoTypes =
infoTypeNames.stream()
.map(it -> InfoType.newBuilder().setName(it).build())
.collect(Collectors.toList());
if (quasiIds.size() != infoTypes.size()) {
throw new IllegalArgumentException("The numbers of quasi-IDs and infoTypes must be equal!");
}
List<TaggedField> taggedFields = new ArrayList<TaggedField>();
for (int i = 0; i < quasiIds.size(); i++) {
TaggedField taggedField =
TaggedField.newBuilder()
.setField(FieldId.newBuilder().setName(quasiIds.get(i)).build())
.setInfoType(infoTypes.get(i))
.build();
taggedFields.add(taggedField);
}
// The k-map distribution region can be specified by any ISO-3166-1 region code.
String regionCode = "US";
// Configure the privacy metric for the job
KMapEstimationConfig kmapConfig =
KMapEstimationConfig.newBuilder()
.addAllQuasiIds(taggedFields)
.setRegionCode(regionCode)
.build();
PrivacyMetric privacyMetric =
PrivacyMetric.newBuilder().setKMapEstimationConfig(kmapConfig).build();
// Create action to publish job status notifications over Google Cloud Pub/Sub
ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
PublishToPubSub publishToPubSub =
PublishToPubSub.newBuilder().setTopic(topicName.toString()).build();
Action action = Action.newBuilder().setPubSub(publishToPubSub).build();
// Configure the risk analysis job to perform
RiskAnalysisJobConfig riskAnalysisJobConfig =
RiskAnalysisJobConfig.newBuilder()
.setSourceTable(bigQueryTable)
.setPrivacyMetric(privacyMetric)
.addActions(action)
.build();
// Build the request to be sent by the client
CreateDlpJobRequest createDlpJobRequest =
CreateDlpJobRequest.newBuilder()
.setParent(LocationName.of(projectId, "global").toString())
.setRiskJob(riskAnalysisJobConfig)
.build();
// Send the request to the API using the client
DlpJob dlpJob = dlpServiceClient.createDlpJob(createDlpJobRequest);
// Set up a Pub/Sub subscriber to listen on the job completion status
final SettableApiFuture<Boolean> done = SettableApiFuture.create();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver messageHandler =
(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> {
handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer);
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build();
subscriber.startAsync();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try {
done.get(15, TimeUnit.MINUTES);
} catch (TimeoutException e) {
System.out.println("Job was not completed after 15 minutes.");
return;
} finally {
subscriber.stopAsync();
subscriber.awaitTerminated();
}
// Build a request to get the completed job
GetDlpJobRequest getDlpJobRequest =
GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build();
// Retrieve completed job status
DlpJob completedJob = dlpServiceClient.getDlpJob(getDlpJobRequest);
System.out.println("Job status: " + completedJob.getState());
System.out.println("Job name: " + dlpJob.getName());
// Get the result and parse through and process the information
KMapEstimationResult kmapResult = completedJob.getRiskDetails().getKMapEstimationResult();
for (KMapEstimationHistogramBucket result : kmapResult.getKMapEstimationHistogramList()) {
System.out.printf(
"\tAnonymity range: [%d, %d]\n", result.getMinAnonymity(), result.getMaxAnonymity());
System.out.printf("\tSize: %d\n", result.getBucketSize());
for (KMapEstimationQuasiIdValues valueBucket : result.getBucketValuesList()) {
List<String> quasiIdValues =
valueBucket.getQuasiIdsValuesList().stream()
.map(
value -> {
String s = value.toString();
return s.substring(s.indexOf(':') + 1).trim();
})
.collect(Collectors.toList());
System.out.printf("\tValues: {%s}\n", String.join(", ", quasiIdValues));
System.out.printf(
"\tEstimated k-map anonymity: %d\n", valueBucket.getEstimatedAnonymity());
}
}
}
}