in Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/serverexplore/AddNewClusterCtrlProvider.java [178:329]
public Observable<AddNewClusterModel> validateAndAdd() {
return Observable.just(new AddNewClusterModel())
.doOnNext(controllableView::getData)
.observeOn(ideSchedulers.processBarVisibleAsync("Validating the cluster settings..."))
.map(toUpdate -> {
SparkClusterType sparkClusterType = toUpdate.getSparkClusterType();
String clusterNameOrUrl = toUpdate.getClusterName();
String userName = Optional.ofNullable(toUpdate.getUserName()).orElse("");
AuthType authType = toUpdate.getAuthType();
String storageName = toUpdate.getStorageName();
String storageKey = toUpdate.getStorageKey();
String password = Optional.ofNullable(toUpdate.getPassword()).orElse("");
URI livyEndpoint = toUpdate.getLivyEndpoint();
URI yarnEndpoint = toUpdate.getYarnEndpoint();
String host = toUpdate.getHost();
int knoxPort = toUpdate.getKnoxPort();
int selectedContainerIndex = toUpdate.getSelectedContainerIndex();
// For HDInsight linked cluster, only real cluster name or real cluster endpoint(pattern as https://sparkcluster.azurehdinsight.net/) are allowed to be cluster name
// For HDInsight livy linked or aris linked cluster, cluster name format is not restricted
final String clusterName = sparkClusterType == SparkClusterType.HDINSIGHT_CLUSTER
? getClusterName(clusterNameOrUrl)
: clusterNameOrUrl;
// These validation check are redundant for intelliJ sicne intellij does full check at view level
// but necessary for Eclipse
HDStorageAccount storageAccount = null;
if (sparkClusterType == SparkClusterType.HDINSIGHT_CLUSTER) {
if (StringUtils.containsWhitespace(clusterNameOrUrl) ||
StringUtils.containsWhitespace(userName) ||
StringUtils.containsWhitespace(password)) {
String highlightPrefix = "* ";
if (!toUpdate.getClusterNameLabelTitle().startsWith(highlightPrefix)) {
toUpdate.setClusterNameLabelTitle(highlightPrefix + toUpdate.getClusterNameLabelTitle());
}
if (!toUpdate.getUserNameLabelTitle().startsWith(highlightPrefix)) {
toUpdate.setUserNameLabelTitle(highlightPrefix + toUpdate.getUserNameLabelTitle());
}
if (!toUpdate.getPasswordLabelTitle().startsWith(highlightPrefix)) {
toUpdate.setPasswordLabelTitle(highlightPrefix + toUpdate.getPasswordLabelTitle());
}
return toUpdate.setErrorMessage("All (*) fields are required.");
}
// Cluster name check
if (clusterName == null) {
return toUpdate.setErrorMessage("Wrong cluster name or endpoint");
}
// Duplication check
if (ClusterManagerEx.getInstance().getAdditionalClusterDetails().stream()
.filter(clusterDetail -> !(clusterDetail instanceof SqlBigDataLivyLinkClusterDetail))
.anyMatch(clusterDetail -> clusterDetail.getName().equals(clusterName))) {
return toUpdate.setErrorMessage("Cluster already exists in linked list");
}
// Storage access check
if (StringUtils.isNotEmpty(storageName)) {
ClientStorageAccount storageAccountClient = new ClientStorageAccount(storageName);
storageAccountClient.setPrimaryKey(storageKey);
// Storage Key check
try {
StorageClientSDKManager.getCloudStorageAccount(storageAccountClient.getConnectionString());
} catch (Exception ex) {
return toUpdate.setErrorMessage("Storage key doesn't match the account.");
}
// Containers selection check
if (selectedContainerIndex < 0 ||
selectedContainerIndex >= toUpdate.getContainers().size()) {
return toUpdate.setErrorMessage("The storage container isn't selected");
}
storageAccount = new HDStorageAccount(
null,
ClusterManagerEx.getInstance().getBlobFullName(storageName),
storageKey,
false,
toUpdate.getContainers().get(selectedContainerIndex));
}
}
IClusterDetail additionalClusterDetail = null;
switch (sparkClusterType) {
case HDINSIGHT_CLUSTER:
additionalClusterDetail =
authType == AuthType.BasicAuth
? new HDInsightAdditionalClusterDetail(clusterName, userName, password, storageAccount)
: new MfaHdiAdditionalClusterDetail(clusterName, userName, password, storageAccount);
break;
case LIVY_LINK_CLUSTER:
additionalClusterDetail =
new HDInsightLivyLinkClusterDetail(livyEndpoint, yarnEndpoint, clusterName, userName, password);
break;
case SQL_BIG_DATA_CLUSTER:
additionalClusterDetail =
new SqlBigDataLivyLinkClusterDetail(host, knoxPort, clusterName, userName, password);
}
// Account certificate check
try {
JobUtils.authenticate(additionalClusterDetail);
} catch (AuthenticationException authErr) {
String errorMsg = "Authentication Error: " + Optional.ofNullable(authErr.getMessage())
.filter(msg -> !msg.isEmpty())
.orElse("Wrong username/password") +
" (" + authErr.getErrorCode() + ")";
return toUpdate
.setErrorMessage(errorMsg)
.setErrorMessageList(ImmutableList.of(Pair.of(toUpdate.ERROR_OUTPUT, errorMsg)));
} catch (SSLHandshakeException ex) {
//user rejects the ac when linking aris cluster
if (sparkClusterType == SparkClusterType.SQL_BIG_DATA_CLUSTER && ex.getCause() instanceof CertificateException) {
return toUpdate
.setErrorMessage(UserRejectCAErrorMsg)
.setErrorMessageList(ImmutableList.of(Pair.of(toUpdate.ERROR_OUTPUT, UserRejectCAErrorMsg)));
}
String errorMsg = "Authentication Error: " + ex.getMessage();
return toUpdate
.setErrorMessage(errorMsg)
.setErrorMessageList(ImmutableList.of(Pair.of(toUpdate.ERROR_OUTPUT, errorMsg)));
} catch (SSLPeerUnverifiedException ex) {
String errorMsg = "Authentication Error: " + ex.getMessage();
return toUpdate
.setErrorMessage(errorMsg)
.setErrorMessageList(ImmutableList.of(
Pair.of(toUpdate.ERROR_OUTPUT, errorMsg),
Pair.of(toUpdate.ERROR_OUTPUT, InvalidCertificateErrorMsg),
Pair.of(toUpdate.NORMAL_OUTPUT, InvalidCertificateInfoMsg)));
} catch (Exception ex) {
String errorMsg = "Authentication Error: " + ex.getMessage();
return toUpdate
.setErrorMessage(errorMsg)
.setErrorMessageList(ImmutableList.of(Pair.of(toUpdate.ERROR_OUTPUT, errorMsg)));
}
// No issue
ClusterManagerEx.getInstance().addAdditionalCluster(additionalClusterDetail);
return toUpdate.setErrorMessage(null).setErrorMessageList(null);
})
.observeOn(ideSchedulers.dispatchUIThread()) // UI operation needs to be in dispatch thread
.doOnNext(controllableView::setData)
.filter(data -> StringUtils.isEmpty(data.getErrorMessage()));
}