public TableDestination getTableDestination()

in ingestion-beam/src/main/java/com/mozilla/telemetry/transforms/KeyByBigQueryTableDestination.java [51:126]


  public TableDestination getTableDestination(Map<String, String> attributes) {
    attributes = new HashMap<>(attributes);

    // We coerce all docType and namespace names to be snake_case and to remove invalid
    // characters; these transformations MUST match with the transformations applied by the
    // jsonschema-transpiler and mozilla-schema-generator when creating table schemas in BigQuery.
    final String namespace = attributes.get(Attribute.DOCUMENT_NAMESPACE);
    final String docType = attributes.get(Attribute.DOCUMENT_TYPE);
    if (namespace != null) {
      attributes.put(Attribute.DOCUMENT_NAMESPACE, getAndCacheNormalizedName(namespace));
    }
    if (docType != null) {
      attributes.put(Attribute.DOCUMENT_TYPE, getAndCacheNormalizedName(docType));
    }

    // Only letters, numbers, and underscores are allowed in BigQuery dataset and table names,
    // but some doc types and namespaces contain '-', so we convert to '_'; we don't pass all
    // values through getAndCacheBqName to avoid expensive regex operations and polluting the
    // cache of transformed field names.
    attributes = Maps.transformValues(attributes, v -> v.replaceAll("-", "_"));

    final String tableSpec = StringSubstitutor.replace(tableSpecTemplate, attributes);

    // Send to error collection if incomplete tableSpec; $ is not a valid char in tableSpecs.
    if (tableSpec.contains("$")) {
      throw new IllegalArgumentException("Element did not contain all the attributes needed to"
          + " fill out variables in the configured BigQuery output template: " + tableSpecTemplate);
    }

    final TableDestination tableDestination = new TableDestination(tableSpec, null,
        new TimePartitioning().setField(partitioningField),
        new Clustering().setFields(clusteringFields));
    final TableReference ref = BigQueryHelpers.parseTableSpec(tableSpec);
    final DatasetReference datasetRef = new DatasetReference().setProjectId(ref.getProjectId())
        .setDatasetId(ref.getDatasetId());

    if (bqService == null) {
      bqService = BigQueryOptions.newBuilder().setProjectId(ref.getProjectId())
          .setRetrySettings(RETRY_SETTINGS).build().getService();
    }

    // Get and cache a listing of table names for this dataset.
    Set<String> tablesInDataset;
    if (tableListingCache == null) {
      // We need to be very careful about settings for the cache here. We have had significant
      // issues in the past due to exceeding limits on BigQuery API requests; see
      // https://bugzilla.mozilla.org/show_bug.cgi?id=1623000
      tableListingCache = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(10))
          .build();
    }
    try {
      tablesInDataset = tableListingCache.get(datasetRef, () -> {
        Set<String> tableSet = new HashSet<>();
        Dataset dataset = bqService.getDataset(ref.getDatasetId());
        if (dataset != null) {
          dataset.list().iterateAll().forEach(t -> {
            tableSet.add(t.getTableId().getTable());
          });
        }
        return tableSet;
      });
    } catch (ExecutionException e) {
      throw new UncheckedExecutionException(e.getCause());
    }

    // Send to error collection if dataset or table doesn't exist so BigQueryIO doesn't throw a
    // pipeline execution exception.
    if (tablesInDataset.isEmpty()) {
      throw new IllegalArgumentException("Resolved destination dataset does not exist or has no "
          + " tables for tableSpec " + tableSpec);
    } else if (!tablesInDataset.contains(ref.getTableId())) {
      throw new IllegalArgumentException("Resolved destination table does not exist: " + tableSpec);
    }

    return tableDestination;
  }