public void load()

in src/main/java/com/google/cloud/dfmetrics/output/BigqueryStore.java [104:168]


  public void load(JobInfo jobInfo, Map<String, Double> metrics) {
    LOG.debug("Loading metrics:\n{}", PrettyLogger.logMap(metrics));
    try {
      // Create a bq service instance
      BigQuery bigquery =
          BigQueryOptions.newBuilder()
              .setProjectId(tableReference.getProjectId())
              .build()
              .getService();

      // Create a table row
      Map<String, Object> row = new HashMap<>();
      row.put("run_timestamp", Instant.now().toString());
      row.put("job_create_timestamp", jobInfo.createTime());
      row.put("sdk", jobInfo.sdk());
      row.put("version", jobInfo.sdkVersion());
      row.put("job_type", jobInfo.jobType());
      addIfValueExists(row, "template_name", jobInfo.templateName());
      addIfValueExists(row, "template_version", jobInfo.templateVersion());
      addIfValueExists(row, "template_type", jobInfo.templateType());
      row.put("pipeline_name", jobInfo.pipelineName());

      // Convert parameters map to list of table row since it's a repeated record
      List<TableRow> parameterRows = new ArrayList<>();
      if (jobInfo.parameters() != null) {
        for (Map.Entry<String, String> entry : jobInfo.parameters().entrySet()) {
          TableRow parameterRow =
              new TableRow().set("name", entry.getKey()).set("value", entry.getValue());
          parameterRows.add(parameterRow);
        }
      }
      row.put("parameters", parameterRows);

      // Convert metrics map to list of table row since it's a repeated record
      List<TableRow> metricRows = new ArrayList<>();
      for (Map.Entry<String, Double> entry : metrics.entrySet()) {
        TableRow metricRow =
            new TableRow().set("name", entry.getKey()).set("value", entry.getValue());
        metricRows.add(metricRow);
      }
      row.put("metrics", metricRows);

      // Create insertAll (streaming) request
      InsertAllRequest insertAllRequest =
          InsertAllRequest.newBuilder(tableReference.getDatasetId(), tableReference.getTableId())
              .addRow(row)
              .build();

      // Insert data into table
      InsertAllResponse response = bigquery.insertAll(insertAllRequest);

      // Check for errors and raise exception
      if (response.hasErrors()) {
        StringBuilder errors = new StringBuilder();
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          errors.append(
              System.out.printf(
                  "error in entry %d: %s \n", entry.getKey(), entry.getValue().toString()));
        }
        throw new RuntimeException("Errors inserting to BigQuery:" + errors);
      }
    } catch (IllegalStateException e) {
      LOG.error("Unable to export results to bigquery. ", e);
    }
  }