in src/main/java/com/google/cloud/dfmetrics/output/BigqueryStore.java [104:168]
public void load(JobInfo jobInfo, Map<String, Double> metrics) {
LOG.debug("Loading metrics:\n{}", PrettyLogger.logMap(metrics));
try {
// Create a bq service instance
BigQuery bigquery =
BigQueryOptions.newBuilder()
.setProjectId(tableReference.getProjectId())
.build()
.getService();
// Create a table row
Map<String, Object> row = new HashMap<>();
row.put("run_timestamp", Instant.now().toString());
row.put("job_create_timestamp", jobInfo.createTime());
row.put("sdk", jobInfo.sdk());
row.put("version", jobInfo.sdkVersion());
row.put("job_type", jobInfo.jobType());
addIfValueExists(row, "template_name", jobInfo.templateName());
addIfValueExists(row, "template_version", jobInfo.templateVersion());
addIfValueExists(row, "template_type", jobInfo.templateType());
row.put("pipeline_name", jobInfo.pipelineName());
// Convert parameters map to list of table row since it's a repeated record
List<TableRow> parameterRows = new ArrayList<>();
if (jobInfo.parameters() != null) {
for (Map.Entry<String, String> entry : jobInfo.parameters().entrySet()) {
TableRow parameterRow =
new TableRow().set("name", entry.getKey()).set("value", entry.getValue());
parameterRows.add(parameterRow);
}
}
row.put("parameters", parameterRows);
// Convert metrics map to list of table row since it's a repeated record
List<TableRow> metricRows = new ArrayList<>();
for (Map.Entry<String, Double> entry : metrics.entrySet()) {
TableRow metricRow =
new TableRow().set("name", entry.getKey()).set("value", entry.getValue());
metricRows.add(metricRow);
}
row.put("metrics", metricRows);
// Create insertAll (streaming) request
InsertAllRequest insertAllRequest =
InsertAllRequest.newBuilder(tableReference.getDatasetId(), tableReference.getTableId())
.addRow(row)
.build();
// Insert data into table
InsertAllResponse response = bigquery.insertAll(insertAllRequest);
// Check for errors and raise exception
if (response.hasErrors()) {
StringBuilder errors = new StringBuilder();
for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
errors.append(
System.out.printf(
"error in entry %d: %s \n", entry.getKey(), entry.getValue().toString()));
}
throw new RuntimeException("Errors inserting to BigQuery:" + errors);
}
} catch (IllegalStateException e) {
LOG.error("Unable to export results to bigquery. ", e);
}
}