in gobblin-metastore/src/main/java/org/apache/gobblin/metastore/database/DatabaseJobHistoryStoreV100.java [184:272]
public synchronized void put(JobExecutionInfo jobExecutionInfo) throws IOException {
Optional<Connection> connectionOptional = Optional.absent();
try {
connectionOptional = Optional.of(getConnection());
Connection connection = connectionOptional.get();
connection.setAutoCommit(false);
// Insert or update job execution information
if (existsJobExecutionInfo(connection, jobExecutionInfo)) {
updateJobExecutionInfo(connection, jobExecutionInfo);
} else {
insertJobExecutionInfo(connection, jobExecutionInfo);
}
// Insert or update job metrics
if (jobExecutionInfo.hasMetrics()) {
for (Metric metric : jobExecutionInfo.getMetrics()) {
boolean insert =
!existsMetric(connection, JOB_METRIC_EXIST_QUERY_STATEMENT_TEMPLATE, jobExecutionInfo.getJobId(), metric);
updateMetric(connection, insert ? JOB_METRIC_INSERT_STATEMENT_TEMPLATE : JOB_METRIC_UPDATE_STATEMENT_TEMPLATE,
jobExecutionInfo.getJobId(), metric, insert);
}
}
// Insert or update job properties
if (jobExecutionInfo.hasJobProperties()) {
for (Map.Entry<String, String> entry : jobExecutionInfo.getJobProperties().entrySet()) {
boolean insert = !existsProperty(connection, JOB_PROPERTY_EXIST_QUERY_STATEMENT_TEMPLATE,
jobExecutionInfo.getJobId(), entry.getKey());
updateProperty(connection,
insert ? JOB_PROPERTY_INSERT_STATEMENT_TEMPLATE : JOB_PROPERTY_UPDATE_STATEMENT_TEMPLATE,
jobExecutionInfo.getJobId(), entry.getKey(), entry.getValue(), insert);
}
}
// Insert or update task execution information
if (jobExecutionInfo.hasTaskExecutions()) {
for (TaskExecutionInfo info : jobExecutionInfo.getTaskExecutions()) {
// Insert or update task execution information
if (existsTaskExecutionInfo(connection, info)) {
updateTaskExecutionInfo(connection, info);
} else {
insertTaskExecutionInfo(connection, info);
}
// Insert or update task metrics
if (info.hasMetrics()) {
for (Metric metric : info.getMetrics()) {
boolean insert =
!existsMetric(connection, TASK_METRIC_EXIST_QUERY_STATEMENT_TEMPLATE, info.getTaskId(), metric);
updateMetric(connection,
insert ? TASK_METRIC_INSERT_STATEMENT_TEMPLATE : TASK_METRIC_UPDATE_STATEMENT_TEMPLATE,
info.getTaskId(), metric, insert);
}
}
// Insert or update task properties
if (info.hasTaskProperties()) {
for (Map.Entry<String, String> entry : info.getTaskProperties().entrySet()) {
boolean insert = !existsProperty(connection, TASK_PROPERTY_EXIST_QUERY_STATEMENT_TEMPLATE,
info.getTaskId(), entry.getKey());
updateProperty(connection,
insert ? TASK_PROPERTY_INSERT_STATEMENT_TEMPLATE : TASK_PROPERTY_UPDATE_STATEMENT_TEMPLATE,
info.getTaskId(), entry.getKey(), entry.getValue(), insert);
}
}
}
}
connection.commit();
} catch (SQLException se) {
LOGGER.error("Failed to put a new job execution information record", se);
if (connectionOptional.isPresent()) {
try {
connectionOptional.get().rollback();
} catch (SQLException se1) {
LOGGER.error("Failed to rollback", se1);
}
}
throw new IOException(se);
} finally {
if (connectionOptional.isPresent()) {
try {
connectionOptional.get().close();
} catch (SQLException se) {
LOGGER.error("Failed to close connection", se);
}
}
}
}