public Observable validateAndAdd()

in Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/serverexplore/AddNewClusterCtrlProvider.java [178:329]


    public Observable<AddNewClusterModel> validateAndAdd() {
        return Observable.just(new AddNewClusterModel())
                .doOnNext(controllableView::getData)
                .observeOn(ideSchedulers.processBarVisibleAsync("Validating the cluster settings..."))
                .map(toUpdate -> {
                    SparkClusterType sparkClusterType = toUpdate.getSparkClusterType();
                    String clusterNameOrUrl = toUpdate.getClusterName();
                    String userName = Optional.ofNullable(toUpdate.getUserName()).orElse("");
                    AuthType authType = toUpdate.getAuthType();
                    String storageName = toUpdate.getStorageName();
                    String storageKey = toUpdate.getStorageKey();
                    String password = Optional.ofNullable(toUpdate.getPassword()).orElse("");
                    URI livyEndpoint = toUpdate.getLivyEndpoint();
                    URI yarnEndpoint = toUpdate.getYarnEndpoint();
                    String host = toUpdate.getHost();
                    int knoxPort = toUpdate.getKnoxPort();
                    int selectedContainerIndex = toUpdate.getSelectedContainerIndex();


                    // For HDInsight linked cluster, only real cluster name or real cluster endpoint(pattern as https://sparkcluster.azurehdinsight.net/) are allowed to be cluster name
                    // For HDInsight livy linked or aris linked cluster, cluster name format is not restricted
                    final String clusterName = sparkClusterType == SparkClusterType.HDINSIGHT_CLUSTER
                            ? getClusterName(clusterNameOrUrl)
                            : clusterNameOrUrl;

                    // These validation check are redundant for intelliJ sicne intellij does full check at view level
                    // but necessary for Eclipse
                    HDStorageAccount storageAccount = null;
                    if (sparkClusterType == SparkClusterType.HDINSIGHT_CLUSTER) {
                        if (StringUtils.containsWhitespace(clusterNameOrUrl) ||
                                StringUtils.containsWhitespace(userName) ||
                                StringUtils.containsWhitespace(password)) {
                            String highlightPrefix = "* ";

                            if (!toUpdate.getClusterNameLabelTitle().startsWith(highlightPrefix)) {
                                toUpdate.setClusterNameLabelTitle(highlightPrefix + toUpdate.getClusterNameLabelTitle());
                            }

                            if (!toUpdate.getUserNameLabelTitle().startsWith(highlightPrefix)) {
                                toUpdate.setUserNameLabelTitle(highlightPrefix + toUpdate.getUserNameLabelTitle());
                            }

                            if (!toUpdate.getPasswordLabelTitle().startsWith(highlightPrefix)) {
                                toUpdate.setPasswordLabelTitle(highlightPrefix + toUpdate.getPasswordLabelTitle());
                            }

                            return toUpdate.setErrorMessage("All (*) fields are required.");
                        }

                        // Cluster name check
                        if (clusterName == null) {
                            return toUpdate.setErrorMessage("Wrong cluster name or endpoint");
                        }

                        // Duplication check
                        if (ClusterManagerEx.getInstance().getAdditionalClusterDetails().stream()
                                .filter(clusterDetail -> !(clusterDetail instanceof SqlBigDataLivyLinkClusterDetail))
                                .anyMatch(clusterDetail -> clusterDetail.getName().equals(clusterName))) {
                            return toUpdate.setErrorMessage("Cluster already exists in linked list");
                        }

                        // Storage access check
                        if (StringUtils.isNotEmpty(storageName)) {
                            ClientStorageAccount storageAccountClient = new ClientStorageAccount(storageName);
                            storageAccountClient.setPrimaryKey(storageKey);

                            // Storage Key check
                            try {
                                StorageClientSDKManager.getCloudStorageAccount(storageAccountClient.getConnectionString());
                            } catch (Exception ex) {
                                return toUpdate.setErrorMessage("Storage key doesn't match the account.");
                            }

                            // Containers selection check
                            if (selectedContainerIndex < 0 ||
                                    selectedContainerIndex >= toUpdate.getContainers().size()) {
                                return toUpdate.setErrorMessage("The storage container isn't selected");
                            }

                            storageAccount = new HDStorageAccount(
                                    null,
                                    ClusterManagerEx.getInstance().getBlobFullName(storageName),
                                    storageKey,
                                    false,
                                    toUpdate.getContainers().get(selectedContainerIndex));
                        }
                    }

                    IClusterDetail additionalClusterDetail = null;
                    switch (sparkClusterType) {
                        case HDINSIGHT_CLUSTER:
                            additionalClusterDetail =
                                  authType == AuthType.BasicAuth
                                          ? new HDInsightAdditionalClusterDetail(clusterName, userName, password, storageAccount)
                                          : new MfaHdiAdditionalClusterDetail(clusterName, userName, password, storageAccount);
                            break;
                        case LIVY_LINK_CLUSTER:
                            additionalClusterDetail =
                                    new HDInsightLivyLinkClusterDetail(livyEndpoint, yarnEndpoint, clusterName, userName, password);
                            break;
                        case SQL_BIG_DATA_CLUSTER:
                            additionalClusterDetail =
                                    new SqlBigDataLivyLinkClusterDetail(host, knoxPort, clusterName, userName, password);
                    }

                    // Account certificate check
                    try {
                        JobUtils.authenticate(additionalClusterDetail);
                    } catch (AuthenticationException authErr) {
                        String errorMsg = "Authentication Error: " + Optional.ofNullable(authErr.getMessage())
                                .filter(msg -> !msg.isEmpty())
                                .orElse("Wrong username/password") +
                                " (" + authErr.getErrorCode() + ")";
                        return toUpdate
                                .setErrorMessage(errorMsg)
                                .setErrorMessageList(ImmutableList.of(Pair.of(toUpdate.ERROR_OUTPUT, errorMsg)));
                    } catch (SSLHandshakeException ex) {
                        //user rejects the ac when linking aris cluster
                        if (sparkClusterType == SparkClusterType.SQL_BIG_DATA_CLUSTER && ex.getCause() instanceof CertificateException) {
                            return toUpdate
                                    .setErrorMessage(UserRejectCAErrorMsg)
                                    .setErrorMessageList(ImmutableList.of(Pair.of(toUpdate.ERROR_OUTPUT, UserRejectCAErrorMsg)));
                        }

                        String errorMsg = "Authentication Error: " + ex.getMessage();
                        return toUpdate
                                .setErrorMessage(errorMsg)
                                .setErrorMessageList(ImmutableList.of(Pair.of(toUpdate.ERROR_OUTPUT, errorMsg)));
                    } catch (SSLPeerUnverifiedException ex) {
                        String errorMsg = "Authentication Error: " + ex.getMessage();
                        return toUpdate
                                .setErrorMessage(errorMsg)
                                .setErrorMessageList(ImmutableList.of(
                                        Pair.of(toUpdate.ERROR_OUTPUT, errorMsg),
                                        Pair.of(toUpdate.ERROR_OUTPUT, InvalidCertificateErrorMsg),
                                        Pair.of(toUpdate.NORMAL_OUTPUT, InvalidCertificateInfoMsg)));
                    } catch (Exception ex) {
                        String errorMsg = "Authentication Error: " + ex.getMessage();
                        return toUpdate
                                .setErrorMessage(errorMsg)
                                .setErrorMessageList(ImmutableList.of(Pair.of(toUpdate.ERROR_OUTPUT, errorMsg)));
                    }

                    // No issue
                    ClusterManagerEx.getInstance().addAdditionalCluster(additionalClusterDetail);

                    return toUpdate.setErrorMessage(null).setErrorMessageList(null);
                })
                .observeOn(ideSchedulers.dispatchUIThread())     // UI operation needs to be in dispatch thread
                .doOnNext(controllableView::setData)
                .filter(data -> StringUtils.isEmpty(data.getErrorMessage()));
    }