Source code for ciss_vae.utils.clustering

"""run_cissvae takes in the dataset as an input and (optionally) clusters on missingness before running ciss_vae full model."""
from __future__ import annotations
from typing import Optional, Sequence, Tuple, Union
import pandas as pd
import numpy as np

## helper function for getting leiden clusters from snn graph

import numpy as np
def _leiden_from_snn(
    X: np.ndarray,
    *,
    metric: str = "euclidean",
    k: int = 50,
    resolution: float = 0.01,
    objective: str = "CPM",
    mutual: bool = False,
    seed: int | None = None,
):
    try:
        from sklearn.neighbors import NearestNeighbors
        from scipy.sparse import csr_matrix
        import igraph as ig
        import leidenalg as la
    except ImportError as e:
        raise ImportError(
            "Leiden SNN requires scikit-learn, scipy, python-igraph, leidenalg."
        ) from e

    metric = metric.lower()

    ## Forces brute, skip the auto thing
    # algo = "auto" if metric in {"euclidean"} else "brute"

    # kNN connectivity (binary) graph
    nn = NearestNeighbors(n_neighbors=k, metric=metric, algorithm="brute")
    nn.fit(X)
    A = nn.kneighbors_graph(n_neighbors=k, mode="connectivity").tocsr()
    AT = A.T.tocsr()

    n = A.shape[0]
    src = []
    dst = []
    wts = []

    # For each i, compute shared-neighbor counts with its k neighbors
    for i in range(n):
        start, end = A.indptr[i], A.indptr[i + 1]
        neigh = A.indices[start:end]
        if neigh.size == 0:
            continue

        # overlap counts: (# shared neighbors) for each j in neigh
        # A[neigh] @ A[i, :].T gives overlap counts as a sparse matrix
        # Convert to dense array for easier handling
        overlaps = A[neigh].dot(A[i, :].T).toarray().flatten()  # Fixed: .A1 -> .toarray().flatten()

        deg_i = neigh.size
        deg_js = A.indptr[neigh + 1] - A.indptr[neigh]
        unions = deg_i + deg_js - overlaps

        # Jaccard weight in [0,1]
        with np.errstate(divide="ignore", invalid="ignore"):
            w = overlaps / np.maximum(unions, 1)
        
        # Optional: keep only mutual neighbors
        if mutual:
            incoming = AT.indices[AT.indptr[i] : AT.indptr[i + 1]]
            mask = np.isin(neigh, incoming, assume_unique=False)
            neigh = neigh[mask]
            w = w[mask]

        # keep positive weights
        pos = w > 0
        neigh = neigh[pos]
        w = w[pos]
        if neigh.size == 0:
            continue

        src.append(np.full(neigh.size, i, dtype=np.int32))
        dst.append(neigh.astype(np.int32))
        wts.append(w.astype(float))

    if not src:
        raise ValueError("SNN graph ended up empty; try mutual=False, increasing k, or using cosine.")

    src = np.concatenate(src)
    dst = np.concatenate(dst)
    wts = np.concatenate(wts)

    # Build symmetric weighted adjacency: take max(i->j, j->i)
    from scipy.sparse import coo_matrix
    W = coo_matrix((wts, (src, dst)), shape=(n, n)).tocsr()
    W = W.maximum(W.T)

    # Build igraph
    coo = W.tocoo()
    mask = coo.row < coo.col
    edges = list(zip(coo.row[mask].tolist(), coo.col[mask].tolist()))
    weights = coo.data[mask].astype(float)

    g = ig.Graph(n=n, edges=edges, directed=False)
    g.es["weight"] = list(map(float, weights))

    if seed is not None:
        la.Optimiser().set_rng_seed(int(seed))

    obj = objective.lower()
    if obj == "cpm":
        part = la.find_partition(
            g, la.CPMVertexPartition, weights="weight", resolution_parameter=resolution
        )
    elif obj in {"rb", "rbconfig", "rbconfiguration"}:
        part = la.find_partition(
            g, la.RBConfigurationVertexPartition, weights="weight", resolution_parameter=resolution
        )
    else:
        part = la.find_partition(g, la.ModularityVertexPartition, weights="weight")

    labels = np.asarray(part.membership, dtype=int)
    return labels
##  helper function for getting leiden clusters from knn graph

def _leiden_from_knn(
    X: np.ndarray,
    *,
    metric: str,
    k: int = 15,
    resolution: float = 0.5,
    objective: str = "CPM",  # {"CPM","RB","Modularity"}
    seed: Optional[int] = None,
    weight_scheme: str = "auto",  # "auto", "heat", "inv", "1-minus"
):
    """
    Build a kNN graph on X and run Leiden. Returns integer labels.
    """
    try:
        from sklearn.neighbors import NearestNeighbors
        import igraph as ig
        import leidenalg as la
    except ImportError as e:
        raise ImportError(
            "Leiden path requires: scikit-learn, python-igraph, and leidenalg.\n"
            "Install with: pip install python-igraph leidenalg scikit-learn"
        ) from e

    metric = metric.lower()
    # Build kNN graph with distances
    # Use brute for metrics like 'jaccard'/'cosine' to be safe and consistent
    algo = "auto" if metric in {"euclidean"} else "brute"
    nn = NearestNeighbors(n_neighbors=k, metric=metric, algorithm=algo)
    nn.fit(X)
    A = nn.kneighbors_graph(n_neighbors=k, mode="distance")  # CSR sparse, stores distances

    # Convert distances to similarity weights for Leiden
    d = A.data
    if weight_scheme == "auto":
        if metric in {"jaccard", "cosine"}:
            w = 1.0 - d
            if metric == "cosine":
                # cosine similarity can be negative if vectors aren't normalized
                w = np.clip(w, 0.0, None)
        elif metric == "euclidean":
            # Heat kernel based on median neighbor distance
            sigma = np.median(d) + 1e-12
            w = np.exp(-(d / sigma) ** 2)
        else:
            # Fallback: inverse distance
            w = 1.0 / (d + 1e-12)
    elif weight_scheme == "1-minus":
        w = 1.0 - d
    elif weight_scheme == "heat":
        sigma = np.median(d) + 1e-12
        w = np.exp(-(d / sigma) ** 2)
    elif weight_scheme == "inv":
        w = 1.0 / (d + 1e-12)
    else:
        raise ValueError("Unknown weight_scheme.")

    # Replace distances with weights
    from scipy.sparse import csr_matrix
    W = csr_matrix((w, A.indices, A.indptr), shape=A.shape)
    # Symmetrize by taking the maximum weight in either direction
    W = W.maximum(W.T)

    # Build igraph
    coo = W.tocoo()
    mask = coo.row < coo.col  # undirected: take upper triangle once
    edges = list(zip(coo.row[mask].tolist(), coo.col[mask].tolist()))
    weights = coo.data[mask].astype(float)

    import igraph as ig
    import leidenalg as la

    g = ig.Graph(n=W.shape[0], edges=edges, directed=False)
    g.es["weight"] = list(map(float, weights))

    if seed is not None:
        la.Optimiser().set_rng_seed(int(seed))

    obj = objective.lower()
    if obj == "cpm":
        part = la.find_partition(
            g, la.CPMVertexPartition, weights="weight", resolution_parameter=resolution
        )
    elif obj in {"rb", "rbconfig", "rbconfiguration"}:
        part = la.find_partition(
            g, la.RBConfigurationVertexPartition, weights="weight", resolution_parameter=resolution
        )
    else:
        # Modularity (no resolution parameter in standard form)
        part = la.find_partition(g, la.ModularityVertexPartition, weights="weight")

    labels = np.asarray(part.membership, dtype=int)
    return labels

# -------------------
# Func 1: Cluster on missingness
# -------------------
## leiden if no k specified

[docs] def cluster_on_missing( data, cols_ignore=None, n_clusters=None, # if n_clusters = None use Leiden k_neighbors: int = 15, use_snn: bool = True, leiden_resolution: float = 0.5, leiden_objective: str = "CPM", seed: int = 42 ): """ Cluster samples based on their missingness patterns using KMeans or Leiden. When ``n_clusters`` is ``None``, performs Leiden clustering on a graph constructed from the binary missingness mask of the dataset. If ``use_snn=True``, builds a shared-nearest-neighbor (SNN) graph using Jaccard similarity; otherwise, constructs a standard kNN graph with Jaccard weights. Returns both the cluster labels and an optional silhouette score. :param data: Input dataset with potential missing values, shape ``(n_samples, n_features)``. Non-numeric columns should be excluded or specified in ``cols_ignore``. :type data: pandas.DataFrame :param cols_ignore: Column names to exclude from the missingness pattern clustering. Typically includes identifiers or static metadata columns. Defaults to ``None``. :type cols_ignore: list[str] or None, optional :param n_clusters: Number of clusters for KMeans. If ``None``, uses Leiden clustering on the binary missingness mask instead. Defaults to ``None``. :type n_clusters: int or None, optional :param k_neighbors: Number of nearest neighbors used when constructing the kNN/SNN graph for Leiden clustering. Defaults to ``15``. :type k_neighbors: int, optional :param use_snn: If ``True``, constructs a shared-nearest-neighbor (SNN) graph using mutual neighbor overlap weighted by Jaccard similarity. If ``False``, uses standard kNN graph weighting by Jaccard distance. Defaults to ``True``. :type use_snn: bool, optional :param leiden_resolution: Resolution parameter for Leiden clustering; higher values yield more clusters. Defaults to ``0.5``. :type leiden_resolution: float, optional :param leiden_objective: Objective function for Leiden optimization. One of ``{"CPM", "RB", "Modularity"}``. Defaults to ``"CPM"``. :type leiden_objective: str, optional :param seed: Random seed for reproducibility in KMeans and Leiden algorithms. Defaults to ``42``. :type seed: int, optional :returns: Tuple ``(labels, silhouette)``: - **labels** (:class:`numpy.ndarray`): Cluster assignments of length ``n_samples``. - **silhouette** (:class:`float` or ``None``): Silhouette score computed using Jaccard distance on the binary missingness mask; ``None`` if undefined. :rtype: tuple[numpy.ndarray, float or None] **Example**:: >>> labels, silh = cluster_on_missing(data, n_clusters=None, use_snn=True) >>> np.unique(labels) array([0, 1, 2]) >>> print(f"Silhouette: {silh:.3f}") Silhouette: 0.408 """ try: from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score except ImportError as e: raise ImportError( "This function requires scikit-learn. Install with: pip install scikit-learn" ) from e # Boolean missingness mask: True=missing, False=observed mask_matrix = ( data.drop(columns=cols_ignore).isna().to_numpy(dtype=bool) if cols_ignore is not None else data.isna().to_numpy(dtype=bool) ) # --------------------------- # Branch: Leiden or KMeans # --------------------------- if n_clusters is None: # Route to your existing helpers (keeps code unified) if use_snn: labels = _leiden_from_snn( mask_matrix, # boolean array OK (sklearn jaccard supports boolean) metric="jaccard", k=k_neighbors, resolution=leiden_resolution, objective=leiden_objective, mutual=False, seed=seed, ) else: labels = _leiden_from_knn( mask_matrix, metric="jaccard", k=k_neighbors, resolution=leiden_resolution, objective=leiden_objective, seed=seed, weight_scheme="1-minus", # similarity = 1 - jaccard distance ) X_for_sil = mask_matrix sil_metric = "jaccard" else: # KMeans on the binary mask n_init = "auto" try: _ = KMeans(n_clusters=2, n_init=n_init) except TypeError: n_init = 10 # scikit-learn < 1.4 km = KMeans(n_clusters=n_clusters, n_init=n_init, random_state=seed) labels = km.fit_predict(mask_matrix.astype(float)) X_for_sil = mask_matrix sil_metric = "jaccard" # Silhouette if ≥2 clusters and all cluster sizes ≥2 unique, counts = np.unique(labels, return_counts=True) silhouette = None if len(unique) > 1 and np.all(counts >= 2): silhouette = silhouette_score(X_for_sil, labels, metric=sil_metric) return labels, silhouette
[docs] def cluster_on_missing_prop( prop_matrix: Union[pd.DataFrame, np.ndarray], *, n_clusters: Optional[int] = None, seed: Optional[int] = None, # Leiden params (used when n_clusters is None) k_neighbors: int = 15, use_snn: bool = True, snn_mutual: bool = True, leiden_resolution: float = 0.5, leiden_objective: str = "CPM", metric: str = "euclidean", # use "euclidean" or "cosine" for proportions scale_features: bool = False, ) -> Tuple[np.ndarray, Optional[float]]: """ Cluster samples based on their per-feature missingness proportions using KMeans or Leiden. When ``n_clusters`` is ``None``, performs Leiden clustering on a graph constructed from the missingness proportion matrix. If ``use_snn=True``, builds a shared-nearest-neighbor (SNN) graph with Jaccard-based or metric-based similarity; otherwise uses a standard kNN graph. Returns both the cluster labels and an optional silhouette score. :param prop_matrix: Matrix of missingness proportions, shape ``(n_samples, n_features)``. Each entry represents the fraction of missing values for a feature within each sample. Values must lie in ``[0, 1]``. :type prop_matrix: pandas.DataFrame or numpy.ndarray :param n_clusters: Number of clusters for KMeans. If ``None``, uses Leiden clustering instead. Defaults to ``None``. :type n_clusters: int or None, optional :param seed: Random seed for KMeans initialization and Leiden reproducibility. Defaults to ``None``. :type seed: int or None, optional :param k_neighbors: Number of nearest neighbors for kNN/SNN graph construction used by Leiden. Defaults to ``15``. :type k_neighbors: int, optional :param use_snn: If ``True``, constructs a shared-nearest-neighbor (SNN) graph using mutual or weighted neighbor overlap. If ``False``, uses standard kNN. Defaults to ``True``. :type use_snn: bool, optional :param snn_mutual: If ``True``, retains only mutual nearest neighbors when building the SNN graph. Defaults to ``True``. :type snn_mutual: bool, optional :param leiden_resolution: Resolution parameter controlling cluster granularity in Leiden. Higher values produce more clusters. Defaults to ``0.5``. :type leiden_resolution: float, optional :param leiden_objective: Objective function for Leiden optimization. One of ``{"CPM", "RB", "Modularity"}``. Defaults to ``"CPM"``. :type leiden_objective: str, optional :param metric: Distance metric used for kNN graph construction and silhouette calculation. Recommended options are ``"euclidean"`` or ``"cosine"``. Defaults to ``"euclidean"``. :type metric: str, optional :param scale_features: Whether to standardize features (zero mean, unit variance) prior to clustering. Recommended when feature scales differ widely. Defaults to ``False``. :type scale_features: bool, optional :returns: Tuple ``(labels, silhouette)``: - **labels** (:class:`numpy.ndarray`): Cluster assignments of length ``n_samples``. - **silhouette** (:class:`float` or ``None``): Silhouette score based on the same metric; ``None`` if undefined (e.g., single cluster). :rtype: tuple[numpy.ndarray, float or None] **Example**:: >>> labels, silh = cluster_on_missing_prop(prop_matrix, n_clusters=None, use_snn=True) >>> np.unique(labels) array([0, 1, 2, 3]) >>> print(f"Silhouette: {silh:.3f}") Silhouette: 0.421 """ # Optional deps kept inside try: from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score from sklearn.preprocessing import StandardScaler except ImportError as e: raise ImportError( "Optional dependencies required: scikit-learn (and leidenalg via _leiden_from_knn).\n" "Install with: pip install ciss_vae[clustering]" ) from e # Convert to array (handle DataFrame and custom classes with to_numpy) if isinstance(prop_matrix, pd.DataFrame): X = prop_matrix.to_numpy(dtype=float, copy=True) elif hasattr(prop_matrix, "to_numpy"): X = prop_matrix.to_numpy(dtype=float, copy=True) # supports MissingnessMatrix else: X = np.asarray(prop_matrix, dtype=float).copy() if X.ndim != 2: raise ValueError("prop_matrix must be 2D (n_samples × n_features).") n_samples, n_features = X.shape # Sanity checks: finite and within [0,1] if not np.isfinite(X).all(): raise ValueError("prop_matrix contains non-finite values (NaN/Inf).") if (X < 0).any() or (X > 1).any(): X = np.clip(X, 0.0, 1.0) # Optionally scale features X_rows = X if scale_features: X_rows = StandardScaler().fit_transform(X_rows) metric = metric.lower() if metric not in {"euclidean", "cosine"}: raise ValueError("metric must be 'euclidean' or 'cosine' for proportion data.") # Clustering if n_clusters is None: # Leiden on kNN graph derived from the chosen metric if use_snn: labels = _leiden_from_snn( X_rows, metric=metric, k=k_neighbors, resolution=leiden_resolution, objective=leiden_objective, mutual=snn_mutual, seed=seed, ) else: labels = _leiden_from_knn( X_rows, metric=metric, k=k_neighbors, resolution=leiden_resolution, objective=leiden_objective, seed=seed, weight_scheme="auto", ) X_for_sil = X_rows sil_metric = metric else: n_init = "auto" try: _ = KMeans(n_clusters=2, n_init=n_init) except TypeError: n_init = 10 km = KMeans(n_clusters=n_clusters, n_init=n_init, random_state=seed) labels = km.fit_predict(X_rows) X_for_sil = X_rows sil_metric = metric # Silhouette if valid unique, counts = np.unique(labels, return_counts=True) silhouette = None if len(unique) > 1 and np.all(counts >= 2): silhouette = silhouette_score(X_for_sil, labels, metric=sil_metric) return labels, silhouette