private Callable getLabeledResultSetTask()

in elastic-db-tools/src/main/java/com/microsoft/azure/elasticdb/query/multishard/MultiShardStatement.java [561:622]


    private Callable<LabeledResultSet> getLabeledResultSetTask(CommandBehavior behavior,
            Pair<ShardLocation, Statement> shardStatements,
            MultiShardExecutionPolicy executionPolicy,
            RetryPolicy commandRetryPolicy) {
        ShardLocation shard = shardStatements.getLeft();
        AtomicReference<PreparedStatement> statement = new AtomicReference<>((PreparedStatement) shardStatements.getRight());
        return () -> {
            Stopwatch stopwatch = Stopwatch.createStarted();

            log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Starting command execution for" + "Shard: {}; Behavior: {}; Retry Policy: {}",
                    shard, behavior, this.getRetryPolicy());

            // The per-shard command is about to execute.
            // Raise the shardExecutionBegan event.
            this.onShardExecutionBegan(shard);

            LabeledResultSet resultSet = commandRetryPolicy.executeAction(() -> {
                try {
                    LabeledResultSet labeledReader;
                    if (statement.get().execute()) {
                        ResultSet res = statement.get().getResultSet();

                        // Validate the result set
                        MultiShardException ex = validateResultSet(res, shard);
                        if (ex != null) {
                            if (executionPolicy.equals(MultiShardExecutionPolicy.CompleteResults)) {
                                throw ex;
                            }
                            labeledReader = new LabeledResultSet(ex, shard, statement.get());
                        }
                        else {
                            labeledReader = new LabeledResultSet(res, shard, statement.get());
                        }
                        // Raise the ShardExecutionReaderReturned event.
                        this.onShardExecutionReaderReturned(shard, labeledReader);
                    }
                    else {
                        labeledReader = new LabeledResultSet(shard, statement.get());
                    }

                    return labeledReader;
                }
                catch (SQLException ex) {
                    stopwatch.stop();
                    log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Command Execution Failed; " + "Execution Time: {} ",
                            stopwatch.elapsed(TimeUnit.MILLISECONDS));

                    throw new MultiShardException(shard, ex);
                }
            });

            stopwatch.stop();

            log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Completed command execution for" + "Shard: {}; Execution Time: {} ", shard,
                    stopwatch.elapsed(TimeUnit.MILLISECONDS));

            // Raise the ShardExecutionSucceeded event.
            this.onShardExecutionSucceeded(shard, resultSet);

            return resultSet;
        };
    }