protected void addShardAsync()

in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java [125:183]


    protected void addShardAsync(final Entity replicaSet) {
        final Duration timeout = Duration.minutes(20);
        final Stopwatch stopwatch = Stopwatch.createStarted();
        final AtomicInteger attempts = new AtomicInteger();
        
        // TODO Don't use executor, use ExecutionManager; but following pattern in MongoDBReplicaSetImpl for now.
        executor.submit(new Runnable() {
            @Override
            public void run() {
                boolean reschedule;
                MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
                if (router == null) {
                    LOG.debug("Rescheduling adding shard {} because no running router for cluster {}", replicaSet, this);
                    reschedule = true;
                } else {
                    MongoDBClientSupport client;
                    try {
                        client = MongoDBClientSupport.forServer(router);
                    } catch (UnknownHostException e) {
                        throw Exceptions.propagate(e);
                    }

                    try {
                        MongoDBServer primary = replicaSet.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY);
                        if (primary != null) {
                            String addr = String.format("%s:%d", primary.getAttribute(MongoDBServer.SUBNET_HOSTNAME), primary.getAttribute(MongoDBServer.PORT));
                            String replicaSetURL = ((MongoDBReplicaSet) replicaSet).getName() + "/" + addr;
                            boolean added = client.addShardToRouter(replicaSetURL);
                            if (added) {
                                LOG.info("{} added shard {} via {}", new Object[]{MongoDBShardClusterImpl.this, replicaSetURL, router});
                                addedMembers.add(replicaSet);
                                reschedule = false;
                            } else {
                                LOG.debug("Rescheduling addition of shard {} because add failed via router {}", replicaSetURL, router);
                                reschedule = true;
                            }
                        } else {
                            LOG.debug("Rescheduling addition of shard {} because primary is null", replicaSet);
                            reschedule = true;
                        }
                    } catch (Exception e) {
                        LOG.error("Failed to add shard to router {}:  ", router,  e);
                        throw Exceptions.propagate(e);
                    }
                }
                
                if (reschedule) {
                    int numAttempts = attempts.incrementAndGet();
                    if (numAttempts > 1 && timeout.toMilliseconds() > stopwatch.elapsed(TimeUnit.MILLISECONDS)) {
                        executor.schedule(this, 3, TimeUnit.SECONDS);
                    } else {
                        LOG.warn("Timeout after {} attempts ({}) adding shard {}; aborting", 
                                new Object[] {numAttempts, Time.makeTimeStringRounded(stopwatch), replicaSet});
                        addingMembers.remove(replicaSet);
                    }
                }
            }
        });
    }