from __future__ import annotations
from typing import Dict, Optional, List, Any, Union, Set
import re
import numpy as np
import pandas as pd
[docs]
class MissingnessMatrix:
"""A matrix with missingness proportions and metadata."""
def __init__(
self,
data: np.ndarray,
feature_columns_map: Dict[str, List[str]],
feature_names: List[str],
sample_names: Optional[List[str]] = None,
):
self.data = data
self.feature_columns_map = feature_columns_map
self.feature_names = feature_names
self.sample_names = sample_names or list(range(len(data)))
@property
def shape(self):
"""Return (n_samples, n_features)."""
return self.data.shape
def __getitem__(self, key):
"""Index into the underlying array."""
return self.data[key]
def __array__(self):
"""Allow NumPy ops directly on this object."""
return self.data
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Convert to pandas DataFrame with preserved names."""
return pd.DataFrame(self.data, columns=self.feature_names, index=self.sample_names)
[docs]
def to_numpy(self, dtype=None, copy: bool = False) -> np.ndarray:
"""Return the underlying NumPy array (optionally cast/copied)."""
arr = self.data
if dtype is not None and arr.dtype != dtype:
arr = arr.astype(dtype, copy=False)
if copy:
arr = arr.copy()
return arr
def __repr__(self) -> str:
"""Full string representation (no preview/truncation)."""
return str(self.to_dataframe())
def __str__(self) -> str:
"""Full string representation (no preview/truncation)."""
return str(self.to_dataframe())
[docs]
def head(self):
return(self.to_dataframe().head())
[docs]
def create_missingness_prop_matrix(
data: Union[pd.DataFrame, np.ndarray],
index_col: Optional[str] = None,
cols_ignore: Optional[List[str]] = None,
na_values: Optional[List[Any]] = None,
repeat_feature_names: Optional[List[str]] = None,
timepoint_prefix: Optional[str] = None,
nonint_timepoint: bool = False,
column_mapping: Optional[Dict[str, List[str]]] = None,
loose = False,
) -> MissingnessMatrix:
"""
Create a missingness proportion matrix summarizing feature-level missingness per sample.
Computes the proportion of missing values for each feature within each sample,
optionally aggregating repeated measurements (e.g., ``feature_t1``, ``feature_t2``).
Can also accept an explicit ``column_mapping`` from base feature → list of columns.
:param data: Input dataset (coercible to DataFrame).
:type data: pandas.DataFrame or numpy.ndarray
:param index_col: Optional column to use as sample index in the output metadata (not scored).
:type index_col: str or None, optional
:param cols_ignore: Columns to exclude from scoring (e.g., IDs, non-features).
:type cols_ignore: list[str] or None, optional
:param na_values: Extra values to treat as missing (in addition to NaN/None/±Inf).
:type na_values: list[Any] or None, optional
:param repeat_feature_names: Base feature names that have repeated timepoints to be aggregated.
Columns matched by regex pattern:
- if ``timepoint_prefix`` is provided: ``^<feat>_<prefix>\\d+$``
- else: ``^<feat>_\\d+$``
:type repeat_feature_names: list[str] or None, optional
:param timepoint_prefix: Optional prefix that appears before the timepoint integer, e.g., ``t`` to match ``feat_t1``.
:type timepoint_prefix: str or None, optional
:param nonint_timepoint: If true, any text after '_' will count as timepoint (eg Baseline).
:type nonint_timepoint: bool, optional
:param column_mapping: Explicit mapping { base_feature: [col1, col2, ...] } to aggregate. Takes precedence.
:type column_mapping: dict[str, list[str]] or None, optional
:param loose: If true, will match any text starting with the base feature names in `repeat_feature_names`.
:type loose: bool
:returns: MissingnessMatrix with:
- ``data``: (n_samples, n_features) matrix of missingness proportions
- ``feature_columns_map``: mapping of base features → contributing columns
- ``to_dataframe()`` to view as DataFrame
:rtype: MissingnessMatrix
"""
# -------------------------------
# 1) Validate & normalize inputs
# -------------------------------
if not isinstance(data, (pd.DataFrame, np.ndarray)):
raise ValueError("`data` must be a pandas DataFrame or numpy.ndarray.")
# Coerce to DataFrame (no in-place mutation of original)
df = pd.DataFrame(data).copy() if isinstance(data, np.ndarray) else data.copy()
# Normalize column names to strings (prevents regex/type issues)
df.columns = df.columns.astype(str)
# Defaults
cols_ignore = [] if cols_ignore is None else list(cols_ignore)
repeat_feature_names = [] if repeat_feature_names is None else list(repeat_feature_names)
# pandas.isna already covers None/NaN; we also treat ±Inf as missing
na_values = [np.inf, -np.inf] if na_values is None else list(na_values)
# Validate basic types
if index_col is not None and not isinstance(index_col, str):
raise ValueError("`index_col` must be None or a string.")
if not isinstance(cols_ignore, list):
raise ValueError("`cols_ignore` must be a list or None.")
if not isinstance(repeat_feature_names, list):
raise ValueError("`repeat_feature_names` must be a list or None.")
if column_mapping is not None and not isinstance(column_mapping, dict):
raise ValueError("`column_mapping` must be a dict or None.")
# -------------------------------
# 2) Sample names & drop columns
# -------------------------------
# Determine sample names (prefer explicit index_col if present)
if index_col is not None and index_col in df.columns:
sample_names = df[index_col].astype(str).tolist()
else:
# fallback: use DataFrame index if useful; otherwise None → class will auto-range
sample_names = df.index.astype(str).tolist() if hasattr(df, "index") else None
# Columns excluded from scoring
cols_to_drop: Set[str] = set()
if index_col is not None and index_col in df.columns:
cols_to_drop.add(index_col)
for c in cols_ignore:
if c in df.columns:
cols_to_drop.add(c)
# Candidate feature columns (post-exclusions)
all_cols = list(df.columns)
feature_candidate_cols = [c for c in all_cols if c not in cols_to_drop]
if not feature_candidate_cols:
raise ValueError("After excluding `index_col` and `cols_ignore`, no feature columns remain.")
# --------------------------------------------------
# 3) Build feature → columns mapping
# precedence: column_mapping (explicit) > repeat_feature_names (regex) > singletons
# --------------------------------------------------
feature_to_cols: Dict[str, List[str]] = {}
consumed_cols: Set[str] = set()
# (A) explicit mapping takes precedence
if column_mapping:
# validate columns exist
missing = {base: [c for c in cols if c not in df.columns] for base, cols in column_mapping.items()}
missing = {k: v for k, v in missing.items() if v}
if missing:
raise ValueError(f"`column_mapping` refers to missing columns: {missing}")
# adopt mapping (preserve key order)
for base, cols in column_mapping.items():
cols_list = [str(c) for c in cols]
feature_to_cols[base] = cols_list
consumed_cols.update(cols_list)
# (B) repeated features by regex (only those not already covered by mapping)
if repeat_feature_names:
for feat in repeat_feature_names:
if feat in feature_to_cols:
# already defined via mapping; skip regex collection for this base
continue
feat_escaped = re.escape(feat)
if nonint_timepoint:
pattern = rf"^{feat_escaped}_[A-Za-z0-9]+$"
elif timepoint_prefix:
pattern = rf"^{feat_escaped}_{re.escape(timepoint_prefix)}\d+$"
elif loose:
pattern = rf"^{feat_escaped}"
else:
pattern = rf"^{feat_escaped}_\d+$"
matching_cols = [c for c in feature_candidate_cols if re.match(pattern, c)]
if not matching_cols:
raise ValueError(
f"No columns found for repeated feature '{feat}' using pattern '{pattern}'. "
f"Ensure columns look like '{feat}_1', '{feat}_2', ... (or '{feat}_{timepoint_prefix}1', ...)."
)
feature_to_cols[feat] = matching_cols
consumed_cols.update(matching_cols)
# (C) remaining single columns become their own features
for c in feature_candidate_cols:
if c not in consumed_cols:
feature_to_cols[c] = [c]
# Final feature order:
# - preserve dict insertion order (mapping keys first, then repeats, then singletons)
out_features: List[str] = list(feature_to_cols.keys())
if not out_features:
raise ValueError("No features to score after processing mapping and repeats.")
# -------------------------------------------
# 4) Missingness checker (vectorized friendly)
# -------------------------------------------
def is_missing(arr_like: Union[pd.Series, np.ndarray]) -> np.ndarray:
"""
Return boolean mask where values are considered missing:
- pandas.isna (NaN/None)
- ±Inf
- any user-specified values in `na_values`
"""
x = arr_like.values if isinstance(arr_like, pd.Series) else np.asarray(arr_like)
miss = pd.isna(x)
# include ±Inf as missing
with np.errstate(all="ignore"):
miss |= np.isinf(pd.to_numeric(x, errors="coerce"))
# include any explicit values
for na in na_values:
try:
miss |= (x == na)
except Exception:
# Comparisons may fail for mixed dtypes; ignore safely
pass
return miss
# -------------------------------------------
# 5) Compute per-sample missingness proportion
# -------------------------------------------
n_samples = len(df)
n_features = len(out_features)
out = np.full((n_samples, n_features), np.nan, dtype=float)
for j, feat in enumerate(out_features):
cols = feature_to_cols[feat]
subdf = df[cols]
# boolean matrix: True where missing
miss_matrix = subdf.apply(is_missing, axis=0)
# mean across timepoints/columns → per-row proportion missing for this feature
prop_missing = miss_matrix.mean(axis=1).to_numpy(dtype=float)
out[:, j] = prop_missing
# -------------------------------------------
# 6) Package result
# -------------------------------------------
return MissingnessMatrix(
data=out,
feature_columns_map=feature_to_cols,
feature_names=out_features,
sample_names=sample_names,
)