in src/main/java/com/google/cloud/dfmetrics/pipelinemanager/MetricsManager.java [305:333]
private Optional<TimeInterval> getWorkerTimeInterval() throws ParseException {
TimeInterval.Builder builder = TimeInterval.newBuilder();
List<JobMessage> messages =
listMessages(
job().getProjectId(), job().getLocation(), job().getId(), "JOB_MESSAGE_DETAILED");
String startTime = null, endTime = null;
for (JobMessage jobMessage : messages) {
if (jobMessage.getMessageText() != null && !jobMessage.getMessageText().isEmpty()) {
if (WORKER_START_PATTERN.matcher(jobMessage.getMessageText()).find()) {
LOG.info("Found worker start message in job messages.");
startTime = jobMessage.getTime();
}
if (WORKER_STOP_PATTERN.matcher(jobMessage.getMessageText()).find()) {
LOG.info("Found worker stop message in job messages.");
endTime = jobMessage.getTime();
}
}
}
if (startTime != null && endTime != null) {
return Optional.of(
builder
.setStartTime(Timestamps.parse(startTime))
.setEndTime(Timestamps.parse(endTime))
.build());
}
return Optional.empty();
}