in ingestion-beam/src/main/java/com/mozilla/telemetry/transforms/KeyByBigQueryTableDestination.java [51:126]
public TableDestination getTableDestination(Map<String, String> attributes) {
attributes = new HashMap<>(attributes);
// We coerce all docType and namespace names to be snake_case and to remove invalid
// characters; these transformations MUST match with the transformations applied by the
// jsonschema-transpiler and mozilla-schema-generator when creating table schemas in BigQuery.
final String namespace = attributes.get(Attribute.DOCUMENT_NAMESPACE);
final String docType = attributes.get(Attribute.DOCUMENT_TYPE);
if (namespace != null) {
attributes.put(Attribute.DOCUMENT_NAMESPACE, getAndCacheNormalizedName(namespace));
}
if (docType != null) {
attributes.put(Attribute.DOCUMENT_TYPE, getAndCacheNormalizedName(docType));
}
// Only letters, numbers, and underscores are allowed in BigQuery dataset and table names,
// but some doc types and namespaces contain '-', so we convert to '_'; we don't pass all
// values through getAndCacheBqName to avoid expensive regex operations and polluting the
// cache of transformed field names.
attributes = Maps.transformValues(attributes, v -> v.replaceAll("-", "_"));
final String tableSpec = StringSubstitutor.replace(tableSpecTemplate, attributes);
// Send to error collection if incomplete tableSpec; $ is not a valid char in tableSpecs.
if (tableSpec.contains("$")) {
throw new IllegalArgumentException("Element did not contain all the attributes needed to"
+ " fill out variables in the configured BigQuery output template: " + tableSpecTemplate);
}
final TableDestination tableDestination = new TableDestination(tableSpec, null,
new TimePartitioning().setField(partitioningField),
new Clustering().setFields(clusteringFields));
final TableReference ref = BigQueryHelpers.parseTableSpec(tableSpec);
final DatasetReference datasetRef = new DatasetReference().setProjectId(ref.getProjectId())
.setDatasetId(ref.getDatasetId());
if (bqService == null) {
bqService = BigQueryOptions.newBuilder().setProjectId(ref.getProjectId())
.setRetrySettings(RETRY_SETTINGS).build().getService();
}
// Get and cache a listing of table names for this dataset.
Set<String> tablesInDataset;
if (tableListingCache == null) {
// We need to be very careful about settings for the cache here. We have had significant
// issues in the past due to exceeding limits on BigQuery API requests; see
// https://bugzilla.mozilla.org/show_bug.cgi?id=1623000
tableListingCache = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(10))
.build();
}
try {
tablesInDataset = tableListingCache.get(datasetRef, () -> {
Set<String> tableSet = new HashSet<>();
Dataset dataset = bqService.getDataset(ref.getDatasetId());
if (dataset != null) {
dataset.list().iterateAll().forEach(t -> {
tableSet.add(t.getTableId().getTable());
});
}
return tableSet;
});
} catch (ExecutionException e) {
throw new UncheckedExecutionException(e.getCause());
}
// Send to error collection if dataset or table doesn't exist so BigQueryIO doesn't throw a
// pipeline execution exception.
if (tablesInDataset.isEmpty()) {
throw new IllegalArgumentException("Resolved destination dataset does not exist or has no "
+ " tables for tableSpec " + tableSpec);
} else if (!tablesInDataset.contains(ref.getTableId())) {
throw new IllegalArgumentException("Resolved destination table does not exist: " + tableSpec);
}
return tableDestination;
}