in graspologic/cluster/autogmm.py [0:0]
def fit(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> "AutoGMMCluster":
"""
Fits gaussian mixture model to the data.
Initialize with agglomerative clustering then
estimate model parameters with EM algorithm.
Parameters
----------
X : array-like, shape (n_samples, n_features)
List of n_features-dimensional data points. Each row
corresponds to a single data point.
y : array-like, shape (n_samples,), optional (default=None)
List of labels for X if available. Used to compute
ARI scores.
Returns
-------
self : object
Returns an instance of self.
"""
# Deal with number of clusters
if self.max_components is None:
lower_ncomponents = 1
upper_ncomponents = self.min_components
else:
lower_ncomponents = self.min_components
upper_ncomponents = self.max_components
if upper_ncomponents > X.shape[0]:
if self.max_components is None:
msg = "if max_components is None then min_components must be >= "
msg += "n_samples, but min_components = {}, n_samples = {}".format(
upper_ncomponents, X.shape[0]
)
else:
msg = "max_components must be <= n_samples, but max_components = "
msg += "{}, n_samples = {}".format(upper_ncomponents, X.shape[0])
raise ValueError(msg)
elif lower_ncomponents > X.shape[0]:
msg = "min_components must be <= n_samples, but min_components = "
msg += "{}, n_samples = {}".format(upper_ncomponents, X.shape[0])
raise ValueError(msg)
# check if X contains the 0 vector
if np.any(~X.any(axis=1)) and ("cosine" in self.affinity):
if isinstance(self.affinity, np.ndarray):
self.affinity = np.delete(
self.affinity, np.argwhere(self.affinity == "cosine")
)
if isinstance(self.affinity, list):
self.affinity.remove("cosine")
warnings.warn("X contains a zero vector, will not run cosine affinity.")
label_init = self.label_init
if label_init is not None:
if label_init.size != X.shape[0]:
msg = "n_samples must be the same as the length of label_init"
raise ValueError(msg)
param_grid_values = dict(
affinity=self.affinity,
linkage=self.linkage,
covariance_type=self.covariance_type,
n_components=range(lower_ncomponents, upper_ncomponents + 1),
)
param_grid: ParamGridType = list(ParameterGrid(param_grid_values))
param_grid_ag, processed_param_grid = _process_paramgrid(
param_grid, self.kmeans_n_init, self.label_init
)
if isinstance(self.random_state, int):
np.random.seed(self.random_state)
seeds = np.random.randint(
np.iinfo(np.int32).max, size=len(processed_param_grid)
)
else:
seeds = [self.random_state] * len(processed_param_grid)
n = X.shape[0]
if self.max_agglom_size is None or n <= self.max_agglom_size:
X_subset = X
else: # if dataset is huge, agglomerate a subset
subset_idxs = np.random.choice(np.arange(0, n), self.max_agglom_size)
X_subset = X[subset_idxs, :]
ag_labels = []
if self.label_init is None:
for p_ag in param_grid_ag:
if p_ag["affinity"] != "none":
agg = AgglomerativeClustering(
n_clusters=self.min_components, **p_ag
)
agg.fit(X_subset)
hierarchical_labels = _hierarchical_labels(
agg.children_, lower_ncomponents, upper_ncomponents
)
ag_labels.append(hierarchical_labels)
def _fit_for_data(p: ParamGridType, seed: int) -> Dict[str, Any]:
n_clusters = p[1]["n_components"]
if (p[0]["affinity"] != "none") and (self.label_init is None):
index = param_grid_ag.index(p[0])
agg_clustering = ag_labels[index][:, n_clusters - self.min_components]
else:
agg_clustering = []
return self._fit_cluster(X, X_subset, y, p, agg_clustering, seed)
results = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)(
delayed(_fit_for_data)(p, seed)
for p, seed in zip(processed_param_grid, seeds)
)
results = pd.DataFrame(results)
self.results_ = results
# Get the best cov type and its index within the dataframe
best_idx = results["bic/aic"].idxmin()
self.criter_ = results.loc[best_idx, "bic/aic"]
self.n_components_ = results.loc[best_idx, "n_components"]
self.covariance_type_ = results.loc[best_idx, "covariance_type"]
self.affinity_ = results.loc[best_idx, "affinity"]
self.linkage_ = results.loc[best_idx, "linkage"]
self.reg_covar_ = results.loc[best_idx, "reg_covar"]
self.ari_ = results.loc[best_idx, "ari"]
self.model_ = results.loc[best_idx, "model"]
return self