in elastic-db-tools/src/main/java/com/microsoft/azure/elasticdb/query/multishard/MultiShardStatement.java [561:622]
private Callable<LabeledResultSet> getLabeledResultSetTask(CommandBehavior behavior,
Pair<ShardLocation, Statement> shardStatements,
MultiShardExecutionPolicy executionPolicy,
RetryPolicy commandRetryPolicy) {
ShardLocation shard = shardStatements.getLeft();
AtomicReference<PreparedStatement> statement = new AtomicReference<>((PreparedStatement) shardStatements.getRight());
return () -> {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Starting command execution for" + "Shard: {}; Behavior: {}; Retry Policy: {}",
shard, behavior, this.getRetryPolicy());
// The per-shard command is about to execute.
// Raise the shardExecutionBegan event.
this.onShardExecutionBegan(shard);
LabeledResultSet resultSet = commandRetryPolicy.executeAction(() -> {
try {
LabeledResultSet labeledReader;
if (statement.get().execute()) {
ResultSet res = statement.get().getResultSet();
// Validate the result set
MultiShardException ex = validateResultSet(res, shard);
if (ex != null) {
if (executionPolicy.equals(MultiShardExecutionPolicy.CompleteResults)) {
throw ex;
}
labeledReader = new LabeledResultSet(ex, shard, statement.get());
}
else {
labeledReader = new LabeledResultSet(res, shard, statement.get());
}
// Raise the ShardExecutionReaderReturned event.
this.onShardExecutionReaderReturned(shard, labeledReader);
}
else {
labeledReader = new LabeledResultSet(shard, statement.get());
}
return labeledReader;
}
catch (SQLException ex) {
stopwatch.stop();
log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Command Execution Failed; " + "Execution Time: {} ",
stopwatch.elapsed(TimeUnit.MILLISECONDS));
throw new MultiShardException(shard, ex);
}
});
stopwatch.stop();
log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Completed command execution for" + "Shard: {}; Execution Time: {} ", shard,
stopwatch.elapsed(TimeUnit.MILLISECONDS));
// Raise the ShardExecutionSucceeded event.
this.onShardExecutionSucceeded(shard, resultSet);
return resultSet;
};
}