public void submitTopologyWithOpts()

in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [3243:3413]


    public void submitTopologyWithOpts(String topoName, String uploadedJarLocation, String jsonConf,
                                       StormTopology topology, SubmitOptions options)
        throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException {
        try {
            submitTopologyWithOptsCalls.mark();
            assertIsLeader();
            if (options == null) {
                throw new InvalidTopologyException("Cannot submitTopologyWithOpts: SubmitOptions parameter value is null");
            }
            validateTopologyName(topoName);
            checkAuthorization(topoName, null, "submitTopology");
            assertTopoActive(topoName, false);
            @SuppressWarnings("unchecked")
            Map<String, Object> topoConf = (Map<String, Object>) JSONValue.parse(jsonConf);
            try {
                ConfigValidation.validateTopoConf(topoConf);
            } catch (IllegalArgumentException ex) {
                throw new WrappedInvalidTopologyException(ex.getMessage());
            }
            validator.validate(topoName, topoConf, topology);
            if ((boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
                @SuppressWarnings("unchecked")
                Map<String, Object> blobMap = (Map<String, Object>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
                if (blobMap != null && !blobMap.isEmpty()) {
                    throw new WrappedInvalidTopologyException("symlinks are disabled so blobs are not supported but "
                                                       + Config.TOPOLOGY_BLOBSTORE_MAP + " = " + blobMap);
                }
            }
            ServerUtils.validateTopologyWorkerMaxHeapSizeConfigs(topoConf, topology,
                                                     ObjectReader.getDouble(conf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)));
            Utils.validateTopologyBlobStoreMap(topoConf, blobStore);
            long uniqueNum = submittedCount.incrementAndGet();
            String topoId = topoName + "-" + uniqueNum + "-" + Time.currentTimeSecs();
            Map<String, String> creds = null;
            if (options.is_set_creds()) {
                creds = options.get_creds().get_creds();
            }
            topoConf.put(Config.STORM_ID, topoId);
            topoConf.put(Config.TOPOLOGY_NAME, topoName);
            topoConf = normalizeConf(conf, topoConf, topology);

            OciUtils.adjustImageConfigForTopo(conf, topoConf, topoId);

            ReqContext req = ReqContext.context();
            Principal principal = req.principal();
            String submitterPrincipal = principal == null ? null : principalToLocal.toLocal(principal);
            Set<String> topoAcl = new HashSet<>(ObjectReader.getStrings(topoConf.get(Config.TOPOLOGY_USERS)));
            topoAcl.add(submitterPrincipal);
            String submitterUser = principalToLocal.toLocal(principal);
            topoAcl.add(submitterUser);

            String topologyPrincipal = Utils.OR(submitterPrincipal, "");
            topoConf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, topologyPrincipal);
            String systemUser = System.getProperty("user.name");
            String topologyOwner = Utils.OR(submitterUser, systemUser);
            topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, topologyOwner); //Don't let the user set who we launch as
            topoConf.put(Config.TOPOLOGY_USERS, new ArrayList<>(topoAcl));
            topoConf.put(Config.STORM_ZOOKEEPER_SUPERACL, conf.get(Config.STORM_ZOOKEEPER_SUPERACL));
            if (!Utils.isZkAuthenticationConfiguredStormServer(conf)) {
                topoConf.remove(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME);
                topoConf.remove(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
            }
            if (!(Boolean) conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)) {
                topoConf.remove(Config.TOPOLOGY_CLASSPATH_BEGINNING);
            }

            String topoVersionString = topology.get_storm_version();
            if (topoVersionString == null) {
                topoVersionString = (String) conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());
            }
            //Check if we can run a topology with that version of storm.
            SimpleVersion topoVersion = new SimpleVersion(topoVersionString);
            List<String> cp = Utils.getCompatibleVersion(supervisorClasspaths, topoVersion, "classpath", null);
            if (cp == null) {
                throw new WrappedInvalidTopologyException("Topology submitted with storm version " + topoVersionString
                                                   + " but could not find a configured compatible version to use "
                                                   + supervisorClasspaths.keySet());
            }
            Map<String, Object> otherConf = Utils.getConfigFromClasspath(cp, conf);
            Map<String, Object> totalConfToSave = Utils.merge(otherConf, topoConf);
            Map<String, Object> totalConf = Utils.merge(conf, totalConfToSave);


            //When reading the conf in nimbus we want to fall back to our own settings
            // if the other config does not have it set.
            topology = normalizeTopology(totalConf, topology);

            // if the Resource Aware Scheduler is used,
            // we might need to set the number of acker executors and eventlogger executors to be the estimated number of workers.
            if (ServerUtils.isRas(conf)) {
                int estimatedNumWorker = ServerUtils.getEstimatedWorkerCountForRasTopo(totalConf, topology);

                setUpAckerExecutorConfigs(topoName, totalConfToSave, totalConf, estimatedNumWorker);
                ServerUtils.validateTopologyAckerBundleResource(totalConfToSave, topology, topoName);

                int numEventLoggerExecs = ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), estimatedNumWorker);
                totalConfToSave.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, numEventLoggerExecs);
                LOG.debug("Config {} set to: {} for topology: {}",
                    Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, numEventLoggerExecs, topoName);

            }

            //Remove any configs that are specific to a host that might mess with the running topology.
            totalConfToSave.remove(Config.STORM_LOCAL_HOSTNAME); //Don't override the host name, or everything looks like it is on nimbus

            IStormClusterState state = stormClusterState;

            if (creds == null && workerTokenManager != null) {
                //Make sure we can store the worker tokens even if no creds are provided.
                creds = new HashMap<>();
            }
            if (creds != null) {
                Map<String, Object> finalConf = Collections.unmodifiableMap(topoConf);
                for (INimbusCredentialPlugin autocred : nimbusAutocredPlugins) {
                    autocred.populateCredentials(creds, finalConf);
                }
                upsertWorkerTokensInCreds(creds, topologyPrincipal, topoId);
            }

            if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)
                && (submitterUser == null || submitterUser.isEmpty())) {
                throw new WrappedAuthorizationException("Could not determine the user to run this topology as.");
            }
            StormCommon.systemTopology(totalConf, topology); //this validates the structure of the topology
            validateTopologySize(topoConf, conf, topology);
            if (Utils.isZkAuthenticationConfiguredStormServer(conf)
                && !Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
                throw new IllegalArgumentException("The cluster is configured for zookeeper authentication, but no payload was provided.");
            }
            LOG.info("Received topology submission for {} (storm-{} JDK-{}) with conf {}", topoName,
                     topoVersionString, topology.get_jdk_version(), ConfigUtils.maskPasswords(topoConf));

            // lock protects against multiple topologies being submitted at once and
            // cleanup thread killing topology in b/w assignment and starting the topology
            synchronized (submitLock) {
                assertTopoActive(topoName, false);
                //cred-update-lock is not needed here because creds are being added for the first time.
                if (creds != null) {
                    state.setCredentials(topoId, new Credentials(creds), topoConf);
                }
                LOG.info("uploadedJar {} for {}", uploadedJarLocation, topoName);
                setupStormCode(conf, topoId, uploadedJarLocation, totalConfToSave, topology);
                waitForDesiredCodeReplication(totalConf, topoId);
                state.setupHeatbeats(topoId, topoConf);
                state.setupErrors(topoId, topoConf);
                if (ObjectReader.getBoolean(totalConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false)) {
                    state.setupBackpressure(topoId, topoConf);
                }
                notifyTopologyActionListener(topoName, "submitTopology");
                TopologyStatus status = null;
                switch (options.get_initial_status()) {
                    case INACTIVE:
                        status = TopologyStatus.INACTIVE;
                        break;
                    case ACTIVE:
                        status = TopologyStatus.ACTIVE;
                        break;
                    default:
                        throw new IllegalArgumentException("Inital Status of " + options.get_initial_status() + " is not allowed.");

                }
                startTopology(topoName, topoId, status, topologyOwner, topologyPrincipal, totalConfToSave, topology);
            }
        } catch (Exception e) {
            LOG.warn("Topology submission exception. (topology name='{}')", topoName, e);
            if (e instanceof TException) {
                throw (TException) e;
            }
            throw new RuntimeException(e);
        }
    }