r"""
Numeric operations
"""
from typing import Any, Iterable, List, Optional
import numpy as np
import scipy.sparse
from .typehint import Array
EPS = 1e-7
# ----------------------------- Numeric functions ------------------------------
[docs]def prod(x: Iterable) -> Any:
r"""
Product of elements
Parameters
----------
x
Input elements
Returns
-------
prod
Product
Note
----
For compatibility with Python<=3.7
"""
try:
from math import prod # pylint: disable=redefined-outer-name
return prod(x)
except ImportError:
ans = 1
for item in x:
ans = ans * item
return ans
[docs]def sigmoid(x: np.ndarray) -> np.ndarray:
r"""
The sigmoid function in numpy
Parameters
----------
x
Input
Returns
-------
s
Sigmoid(x)
"""
return 1 / (1 + np.exp(-x))
# ----------------------------- Arrays & Matrices ------------------------------
[docs]def densify(arr: Array, nan_sparse: bool = False) -> np.ndarray:
r"""
Convert a matrix to dense regardless of original type.
Parameters
----------
arr
Input array (either sparse or dense)
nan_sparse
Whether missing entries indicate nan
Returns
-------
densified
Densified array
"""
if scipy.sparse.issparse(arr):
if nan_sparse:
arr = arr.tocoo()
dense = np.full(arr.shape, np.nan, dtype=arr.dtype)
dense[arr.row, arr.col] = arr.data
return dense
return arr.toarray()
if isinstance(arr, np.ndarray):
return arr
return np.asarray(arr)
[docs]def col_var(X: Array, Y: Optional[Array] = None, bias: bool = False) -> np.ndarray:
r"""
Column-wise variance (sparse friendly)
Parameters
----------
X
First design matrix
Y
Second design matrix (optional)
bias
Whether to return unbiased or biased covariance estimation
Returns
-------
col_var
Column-wise variance, if only X is given.
Column-wise covariance, if both X and Y are given.
"""
Y = X if Y is None else Y
if X.shape != Y.shape:
raise ValueError("X and Y should have the same shape!")
bias_scaling = 1 if bias else X.shape[0] / (X.shape[0] - 1)
if scipy.sparse.issparse(X) or scipy.sparse.issparse(Y):
if not scipy.sparse.issparse(X):
X, Y = Y, X # does not affect trace
return (
np.asarray((X.multiply(Y)).mean(axis=0))
- np.asarray(X.mean(axis=0)) * np.asarray(Y.mean(axis=0))
).ravel() * bias_scaling
return ((X * Y).mean(axis=0) - X.mean(axis=0) * Y.mean(axis=0)) * bias_scaling
[docs]def col_pcc(X: Array, Y: Array) -> np.ndarray:
r"""
Column-wise Pearson's correlation coefficient (sparse friendly)
Parameters
----------
X
First design matrix
Y
Second design matrix
Returns
-------
pcc
Column-wise Pearson's correlation coefficients
"""
return col_var(X, Y) / np.sqrt(col_var(X) * col_var(Y))
[docs]def col_spr(X: Array, Y: Array) -> np.ndarray:
r"""
Column-wise Spearman's rank correlation
Parameters
----------
X
First design matrix
Y
Second design matrix
Returns
-------
spr
Column-wise Spearman's rank correlations
"""
X = densify(X)
X = np.array([scipy.stats.rankdata(X[:, i]) for i in range(X.shape[1])]).T
Y = densify(Y)
Y = np.array([scipy.stats.rankdata(Y[:, i]) for i in range(Y.shape[1])]).T
return col_pcc(X, Y)
[docs]def cov_mat(X: Array, Y: Optional[Array] = None, bias: bool = False) -> np.ndarray:
r"""
Covariance matrix (sparse friendly)
Parameters
----------
X
First design matrix
Y
Second design matrix (optional)
bias
Whether to return unbiased or biased covariance estimation
Returns
-------
cov
Covariance matrix, if only X is given.
Cross-covariance matrix, if both X and Y are given.
"""
X_mean = (
X.mean(axis=0) if scipy.sparse.issparse(X) else X.mean(axis=0, keepdims=True)
)
if Y is None:
Y, Y_mean = X, X_mean
else:
if X.shape[0] != Y.shape[0]:
raise ValueError("X and Y should have the same number of rows!")
Y_mean = (
Y.mean(axis=0)
if scipy.sparse.issparse(Y)
else Y.mean(axis=0, keepdims=True)
)
bias_scaling = 1 if bias else X.shape[0] / (X.shape[0] - 1)
return np.asarray((X.T @ Y) / X.shape[0] - X_mean.T @ Y_mean) * bias_scaling
[docs]def pcc_mat(X: Array, Y: Optional[Array] = None) -> np.ndarray:
r"""
Pearson's correlation coefficient (sparse friendly)
Parameters
----------
X
First design matrix
Y
Second design matrix (optional)
Returns
-------
pcc
Pearson's correlation matrix among columns of X, if only X is given.
Pearson's correlation matrix between columns of X and columns of Y,
if both X and Y are given.
"""
X = X.astype(np.float64)
Y = Y if Y is None else Y.astype(np.float64)
X_std = np.sqrt(col_var(X))[np.newaxis, :]
Y_std = X_std if Y is None else np.sqrt(col_var(Y))[np.newaxis, :]
pcc = cov_mat(X, Y) / X_std.T / Y_std
if Y is None:
assert (pcc - pcc.T).max() < EPS
pcc = (pcc + pcc.T) / 2 # Remove small floating point errors
assert np.abs(np.diag(pcc) - 1).max() < EPS
np.fill_diagonal(pcc, 1) # Remove small floating point errors
overshoot_mask = pcc > 1
if np.any(overshoot_mask):
assert (pcc[overshoot_mask] - 1).max() < EPS
pcc[overshoot_mask] = 1 # Remove small floating point errors
return pcc
[docs]def spr_mat(X: Array, Y: Optional[Array] = None) -> np.ndarray:
r"""
Spearman's rank correlation
Parameters
----------
X
First design matrix
Y
Second design matrix (optional)
Returns
-------
spr
Spearman's correlation matrix among columns of X, if only X is given.
Spearman's correlation matrix between columns of X and columns of Y,
if both X and Y are given.
"""
X = densify(X)
X = np.array([scipy.stats.rankdata(X[:, i]) for i in range(X.shape[1])]).T
if Y is not None:
Y = densify(Y)
Y = np.array([scipy.stats.rankdata(Y[:, i]) for i in range(Y.shape[1])]).T
return pcc_mat(X, Y)
[docs]def tfidf(X: Array, flavor: str = "seurat", **kwargs) -> Array:
r"""
TF-IDF normalization
Parameters
----------
X
Input matrix
flavor
Flavor of TF-IDF normalization, should be one of {"seurat", "allcools"}
kwargs
Additional keyword arguments are passed to each respective implementation
"""
if flavor == "seurat":
return tfidf_seurat(X, **kwargs)
if flavor == "allcools":
return tfidf_allcools(X, **kwargs)
raise ValueError("Unrecognized flavor!")
[docs]def tfidf_seurat(X: Array) -> Array:
r"""
TF-IDF normalization (following the Seurat v3 approach)
Parameters
----------
X
Input matrix
Returns
-------
X_tfidf
TF-IDF normalized matrix
"""
idf = X.shape[0] / X.sum(axis=0)
if scipy.sparse.issparse(X):
tf = X.multiply(1 / X.sum(axis=1))
return tf.multiply(idf)
else:
tf = X / X.sum(axis=1, keepdims=True)
return tf * idf
[docs]def tfidf_allcools(data, scale_factor=100000, idf=None):
r"""
TF-IDF normalization (following the AllCools approach)
Parameters
----------
data
Input matrix
scale_factor
Normalization factor for TF
idf
Inverse document frequency
Returns
-------
X_tfidf
TF-IDF normalized matrix
X_idf
IDF for future reference
"""
sparse_input = scipy.sparse.issparse(data)
if idf is None:
# add small value in case down sample creates empty feature
_col_sum = data.sum(axis=0)
if sparse_input:
col_sum = _col_sum.A1.astype(np.float32) + 0.00001
else:
col_sum = _col_sum.ravel().astype(np.float32) + 0.00001
idf = np.log(1 + data.shape[0] / col_sum).astype(np.float32)
else:
idf = idf.astype(np.float32)
_row_sum = data.sum(axis=1)
if sparse_input:
row_sum = _row_sum.A1.astype(np.float32) + 0.00001
else:
row_sum = _row_sum.ravel().astype(np.float32) + 0.00001
tf = data.astype(np.float32)
if sparse_input:
tf.data = tf.data / np.repeat(row_sum, tf.getnnz(axis=1))
tf.data = np.log1p(np.multiply(tf.data, scale_factor, dtype="float32"))
tf = tf.multiply(idf).tocsr()
else:
tf = tf / row_sum[:, np.newaxis]
tf = np.log1p(np.multiply(tf, scale_factor, dtype="float32"))
tf = tf * idf
return tf, idf
[docs]def prob_or(probs: List[float]) -> float:
r"""
Combined multiple probabilities in a logical OR manner.
Parameters
----------
probs
Array of probabilities
Returns
-------
prob
Combined probability
"""
return 1 - (1 - np.asarray(probs)).prod()
[docs]def vertex_degrees(
eidx: np.ndarray,
ewt: np.ndarray,
vnum: Optional[int] = None,
direction: str = "both",
) -> np.ndarray:
r"""
Compute vertex degrees
Parameters
----------
eidx
Vertex indices of edges (:math:`2 \times n_{edges}`)
ewt
Weight of edges (:math:`n_{edges}`)
vnum
Total number of vertices (determined by max edge index if not specified)
direction
Direction of vertex degree, should be one of {"in", "out", "both"}
Returns
-------
degrees
Vertex degrees
"""
vnum = vnum or eidx.max() + 1
adj = scipy.sparse.coo_matrix((ewt, (eidx[0], eidx[1])), shape=(vnum, vnum))
if direction == "in":
return adj.sum(axis=0).A1
elif direction == "out":
return adj.sum(axis=1).A1
elif direction == "both":
return adj.sum(axis=0).A1 + adj.sum(axis=1).A1 - adj.diagonal()
raise ValueError("Unrecognized direction!")
[docs]def normalize_edges(
eidx: np.ndarray, ewt: np.ndarray, method: str = "keepvar"
) -> np.ndarray:
r"""
Normalize graph edge weights
Parameters
----------
eidx
Vertex indices of edges (:math:`2 \times n_{edges}`)
ewt
Weight of edges (:math:`n_{edges}`)
method
Normalization method, should be one of {"in", "out", "sym", "keepvar"}
Returns
-------
enorm
Normalized weight of edges (:math:`n_{edges}`)
"""
if method not in ("in", "out", "sym", "keepvar"):
raise ValueError("Unrecognized method!")
enorm = ewt
if method in ("in", "keepvar", "sym"):
in_degrees = vertex_degrees(eidx, ewt, direction="in")
in_normalizer = np.power(in_degrees[eidx[1]], -1 if method == "in" else -0.5)
in_normalizer[~np.isfinite(in_normalizer)] = (
0 # In case there are unconnected vertices
)
enorm = enorm * in_normalizer
if method in ("out", "sym"):
out_degrees = vertex_degrees(eidx, ewt, direction="out")
out_normalizer = np.power(out_degrees[eidx[0]], -1 if method == "out" else -0.5)
out_normalizer[~np.isfinite(out_normalizer)] = (
0 # In case there are unconnected vertices
)
enorm = enorm * out_normalizer
return enorm
[docs]def all_counts(x: Array) -> bool:
r"""
Check whether an array contains all counts
Parameters
----------
x
Array to check
Returns
-------
is_counts
Whether the array contains all counts
"""
if scipy.sparse.issparse(x):
x = x.tocsr().data
if x.min() < 0:
return False
return np.allclose(x, x.astype(int))