in src/main/java/com/google/cloud/dfmetrics/output/FileStore.java [57:93]
public void load(JobInfo jobInfo, Map<String, Double> metrics) {
LOG.info("Writing metrics to file path:\n{}", filePath);
// Create the record from the provided details
Map<String, Object> record = new HashMap<>();
record.put("run_timestamp", Instant.now().toString());
record.put("job_create_timestamp", jobInfo.createTime());
record.put("sdk", jobInfo.sdk());
record.put("sdk_version", jobInfo.sdkVersion());
record.put("job_type", jobInfo.jobType());
addIfValueExists(record, "template_name", jobInfo.templateName());
addIfValueExists(record, "template_version", jobInfo.templateVersion());
record.put("template_type", jobInfo.templateType());
record.put("pipeline_name", jobInfo.pipelineName());
record.put("parameters", jobInfo.parameters());
record.put("metrics", metrics);
String json = MetricsCollectorUtils.serializeToJson(record, true);
byte[] contents = json.getBytes(StandardCharsets.UTF_8);
// Write to file path
try {
URI uri = new URI(filePath);
if (uri.getScheme() == null) { // Write to local file system
Files.write(Paths.get(filePath), contents, StandardOpenOption.CREATE);
} else if (uri.getScheme().equals(GCS_URI)) { // Write to Google cloud storage
Storage storageClient = StorageOptions.getDefaultInstance().getService();
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.fromGsUtilUri(filePath)).build();
storageClient.create(blobInfo, contents);
} else {
throw new RuntimeException(
"Invalid file scheme. Supported FileSystems are local, Google Cloud Storage");
}
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}