scglue.metrics 源代码

r"""
Performance evaluation metrics
"""

from typing import Tuple

import numpy as np
import pandas as pd
import scanpy as sc
import scipy.spatial
import sklearn.metrics
import sklearn.neighbors
from anndata import AnnData
from scipy.sparse.csgraph import connected_components

from .typehint import RandomState
from .utils import get_rs


[文档]def mean_average_precision( x: np.ndarray, y: np.ndarray, neighbor_frac: float = 0.01, **kwargs ) -> float: r""" Mean average precision Parameters ---------- x Coordinates y Cell type labels neighbor_frac Nearest neighbor fraction **kwargs Additional keyword arguments are passed to :class:`sklearn.neighbors.NearestNeighbors` Returns ------- map Mean average precision """ k = max(round(y.shape[0] * neighbor_frac), 1) nn = sklearn.neighbors.NearestNeighbors( n_neighbors=min(y.shape[0], k + 1), **kwargs ).fit(x) nni = nn.kneighbors(x, return_distance=False) match = np.equal(y[nni[:, 1:]], np.expand_dims(y, 1)) return np.apply_along_axis(_average_precision, 1, match).mean().item()
def _average_precision(match: np.ndarray) -> float: if np.any(match): cummean = np.cumsum(match) / (np.arange(match.size) + 1) return cummean[match].mean().item() return 0.0
[文档]def normalized_mutual_info(x: np.ndarray, y: np.ndarray, **kwargs) -> float: r""" Normalized mutual information with true clustering Parameters ---------- x Coordinates y Cell type labels **kwargs Additional keyword arguments are passed to :func:`sklearn.metrics.normalized_mutual_info_score` Returns ------- nmi Normalized mutual information Note ---- Follows the definition in `OpenProblems NeurIPS 2021 competition <https://openproblems.bio/neurips_docs/about_tasks/task3_joint_embedding/>`__ """ x = AnnData(X=x, dtype=x.dtype) sc.pp.neighbors(x, n_pcs=0, use_rep="X") nmi_list = [] for res in (np.arange(20) + 1) / 10: sc.tl.leiden(x, resolution=res) leiden = x.obs["leiden"] nmi_list.append(sklearn.metrics.normalized_mutual_info_score( y, leiden, **kwargs ).item()) return max(nmi_list)
[文档]def avg_silhouette_width(x: np.ndarray, y: np.ndarray, **kwargs) -> float: r""" Cell type average silhouette width Parameters ---------- x Coordinates y Cell type labels **kwargs Additional keyword arguments are passed to :func:`sklearn.metrics.silhouette_score` Returns ------- asw Cell type average silhouette width Note ---- Follows the definition in `OpenProblems NeurIPS 2021 competition <https://openproblems.bio/neurips_docs/about_tasks/task3_joint_embedding/>`__ """ return (sklearn.metrics.silhouette_score(x, y, **kwargs).item() + 1) / 2
[文档]def graph_connectivity( x: np.ndarray, y: np.ndarray, **kwargs ) -> float: r""" Graph connectivity Parameters ---------- x Coordinates y Cell type labels **kwargs Additional keyword arguments are passed to :func:`scanpy.pp.neighbors` Returns ------- conn Graph connectivity """ x = AnnData(X=x, dtype=x.dtype) sc.pp.neighbors(x, n_pcs=0, use_rep="X", **kwargs) conns = [] for y_ in np.unique(y): x_ = x[y == y_] _, c = connected_components( x_.obsp['connectivities'], connection='strong' ) counts = pd.value_counts(c) conns.append(counts.max() / counts.sum()) return np.mean(conns).item()
[文档]def seurat_alignment_score( x: np.ndarray, y: np.ndarray, neighbor_frac: float = 0.01, n_repeats: int = 4, random_state: RandomState = None, **kwargs ) -> float: r""" Seurat alignment score Parameters ---------- x Coordinates y Batch labels neighbor_frac Nearest neighbor fraction n_repeats Number of subsampling repeats random_state Random state **kwargs Additional keyword arguments are passed to :class:`sklearn.neighbors.NearestNeighbors` Returns ------- sas Seurat alignment score """ rs = get_rs(random_state) idx_list = [np.where(y == u)[0] for u in np.unique(y)] min_size = min(idx.size for idx in idx_list) repeat_scores = [] for _ in range(n_repeats): subsample_idx = np.concatenate([ rs.choice(idx, min_size, replace=False) for idx in idx_list ]) subsample_x = x[subsample_idx] subsample_y = y[subsample_idx] k = max(round(subsample_idx.size * neighbor_frac), 1) nn = sklearn.neighbors.NearestNeighbors( n_neighbors=k + 1, **kwargs ).fit(subsample_x) nni = nn.kneighbors(subsample_x, return_distance=False) same_y_hits = ( subsample_y[nni[:, 1:]] == np.expand_dims(subsample_y, axis=1) ).sum(axis=1).mean() repeat_score = (k - same_y_hits) * len(idx_list) / (k * (len(idx_list) - 1)) repeat_scores.append(min(repeat_score, 1)) # score may exceed 1, if same_y_hits is lower than expected by chance return np.mean(repeat_scores).item()
[文档]def avg_silhouette_width_batch( x: np.ndarray, y: np.ndarray, ct: np.ndarray, **kwargs ) -> float: r""" Batch average silhouette width Parameters ---------- x Coordinates y Batch labels ct Cell type labels **kwargs Additional keyword arguments are passed to :func:`sklearn.metrics.silhouette_samples` Returns ------- asw_batch Batch average silhouette width Note ---- Follows the definition in `OpenProblems NeurIPS 2021 competition <https://openproblems.bio/neurips_docs/about_tasks/task3_joint_embedding/>`__ """ s_per_ct = [] for t in np.unique(ct): mask = ct == t try: s = sklearn.metrics.silhouette_samples(x[mask], y[mask], **kwargs) except ValueError: # Too few samples s = 0 s = (1 - np.fabs(s)).mean() s_per_ct.append(s) return np.mean(s_per_ct).item()
[文档]def neighbor_conservation( x: np.ndarray, y: np.ndarray, batch: np.ndarray, neighbor_frac: float = 0.01, **kwargs ) -> float: r""" Neighbor conservation score Parameters ---------- x Cooordinates after integration y Coordinates before integration b Batch **kwargs Additional keyword arguments are passed to :class:`sklearn.neighbors.NearestNeighbors` Returns ------- nn_cons Neighbor conservation score """ nn_cons_per_batch = [] for b in np.unique(batch): mask = batch == b x_, y_ = x[mask], y[mask] k = max(round(x.shape[0] * neighbor_frac), 1) nnx = sklearn.neighbors.NearestNeighbors( n_neighbors=min(x_.shape[0], k + 1), **kwargs ).fit(x_).kneighbors_graph(x_) nny = sklearn.neighbors.NearestNeighbors( n_neighbors=min(y_.shape[0], k + 1), **kwargs ).fit(y_).kneighbors_graph(y_) nnx.setdiag(0) # Remove self nny.setdiag(0) # Remove self n_intersection = nnx.multiply(nny).sum(axis=1).A1 n_union = (nnx + nny).astype(bool).sum(axis=1).A1 nn_cons_per_batch.append((n_intersection / n_union).mean()) return np.mean(nn_cons_per_batch).item()
[文档]def foscttm( x: np.ndarray, y: np.ndarray, **kwargs ) -> Tuple[np.ndarray, np.ndarray]: r""" Fraction of samples closer than true match (smaller is better) Parameters ---------- x Coordinates for samples in modality X y Coordinates for samples in modality y **kwargs Additional keyword arguments are passed to :func:`scipy.spatial.distance_matrix` Returns ------- foscttm_x, foscttm_y FOSCTTM for samples in modality X and Y, respectively Note ---- Samples in modality X and Y should be paired and given in the same order """ if x.shape != y.shape: raise ValueError("Shapes do not match!") d = scipy.spatial.distance_matrix(x, y, **kwargs) foscttm_x = (d < np.expand_dims(np.diag(d), axis=1)).mean(axis=1) foscttm_y = (d < np.expand_dims(np.diag(d), axis=0)).mean(axis=0) return foscttm_x, foscttm_y