r"""
Auxiliary functions for :class:`anndata.AnnData` objects
that are not covered in :mod:`scanpy`.
"""
import os
from collections import defaultdict
from itertools import chain
from typing import Callable, List, Mapping, Optional
import anndata as ad
import networkx as nx
import numpy as np
import pandas as pd
import scanpy as sc
import scipy.sparse
import scipy.stats
import sklearn.cluster
import sklearn.decomposition
import sklearn.feature_extraction.text
import sklearn.linear_model
import sklearn.neighbors
import sklearn.preprocessing
import sklearn.utils.extmath
from anndata import AnnData
from networkx.algorithms.bipartite import biadjacency_matrix
from sklearn.preprocessing import normalize
from sparse import COO
from tqdm.auto import tqdm
from . import genomics, num
from .typehint import Kws
from .utils import logged
[文档]def count_prep(adata: AnnData) -> None:
r"""
Standard preprocessing of count-based dataset with
total count normalization and log-transformation
Parameters
----------
adata
Dataset to be preprocessed
"""
sc.pp.normalize_total(adata)
sc.pp.log1p(adata)
[文档]def lsi(
adata: AnnData, n_components: int = 20,
use_highly_variable: Optional[bool] = None, **kwargs
) -> None:
r"""
LSI analysis (following the Seurat v3 approach)
Parameters
----------
adata
Input dataset
n_components
Number of dimensions to use
use_highly_variable
Whether to use highly variable features only, stored in
``adata.var['highly_variable']``. By default uses them if they
have been determined beforehand.
**kwargs
Additional keyword arguments are passed to
:func:`sklearn.utils.extmath.randomized_svd`
"""
if "random_state" not in kwargs:
kwargs["random_state"] = 0 # Keep deterministic as the default behavior
if use_highly_variable is None:
use_highly_variable = "highly_variable" in adata.var
adata_use = adata[:, adata.var["highly_variable"]] if use_highly_variable else adata
X = num.tfidf(adata_use.X)
X_norm = normalize(X, norm="l1")
X_norm = np.log1p(X_norm * 1e4)
X_lsi = sklearn.utils.extmath.randomized_svd(X_norm, n_components, **kwargs)[0]
X_lsi -= X_lsi.mean(axis=1, keepdims=True)
X_lsi /= X_lsi.std(axis=1, ddof=1, keepdims=True)
adata.obsm["X_lsi"] = X_lsi
[文档]@logged
def get_gene_annotation(
adata: AnnData, var_by: str = None,
gtf: os.PathLike = None, gtf_by: str = None,
by_func: Optional[Callable] = None
) -> None:
r"""
Get genomic annotation of genes by joining with a GTF file.
Parameters
----------
adata
Input dataset
var_by
Specify a column in ``adata.var`` used to merge with GTF attributes,
otherwise ``adata.var_names`` is used by default.
gtf
Path to the GTF file
gtf_by
Specify a field in the GTF attributes used to merge with ``adata.var``,
e.g. "gene_id", "gene_name".
by_func
Specify an element-wise function used to transform merging fields,
e.g. removing suffix in gene IDs.
Note
----
The genomic locations are converted to 0-based as specified
in bed format rather than 1-based as specified in GTF format.
"""
if gtf is None:
raise ValueError("Missing required argument `gtf`!")
if gtf_by is None:
raise ValueError("Missing required argument `gtf_by`!")
var_by = adata.var_names if var_by is None else adata.var[var_by]
gtf = genomics.read_gtf(gtf).query("feature == 'gene'").split_attribute()
if by_func:
by_func = np.vectorize(by_func)
var_by = by_func(var_by)
gtf[gtf_by] = by_func(gtf[gtf_by]) # Safe inplace modification
gtf = gtf.sort_values("seqname").drop_duplicates(
subset=[gtf_by], keep="last"
) # Typically, scaffolds come first, chromosomes come last
merge_df = pd.concat([
pd.DataFrame(gtf.to_bed(name=gtf_by)),
pd.DataFrame(gtf).drop(columns=genomics.Gtf.COLUMNS) # Only use the splitted attributes
], axis=1).set_index(gtf_by).reindex(var_by).set_index(adata.var.index)
adata.var = adata.var.assign(**merge_df)
[文档]def aggregate_obs(
adata: AnnData, by: str, X_agg: Optional[str] = "sum",
obs_agg: Optional[Mapping[str, str]] = None,
obsm_agg: Optional[Mapping[str, str]] = None,
layers_agg: Optional[Mapping[str, str]] = None
) -> AnnData:
r"""
Aggregate obs in a given dataset by certain categories
Parameters
----------
adata
Dataset to be aggregated
by
Specify a column in ``adata.obs`` used for aggregation,
must be discrete.
X_agg
Aggregation function for ``adata.X``, must be one of
``{"sum", "mean", ``None``}``. Setting to ``None`` discards
the ``adata.X`` matrix.
obs_agg
Aggregation methods for ``adata.obs``, indexed by obs columns,
must be one of ``{"sum", "mean", "majority"}``, where ``"sum"``
and ``"mean"`` are for continuous data, and ``"majority"`` is for
discrete data. Fields not specified will be discarded.
obsm_agg
Aggregation methods for ``adata.obsm``, indexed by obsm keys,
must be one of ``{"sum", "mean"}``. Fields not specified will be
discarded.
layers_agg
Aggregation methods for ``adata.layers``, indexed by layer keys,
must be one of ``{"sum", "mean"}``. Fields not specified will be
discarded.
Returns
-------
aggregated
Aggregated dataset
"""
obs_agg = obs_agg or {}
obsm_agg = obsm_agg or {}
layers_agg = layers_agg or {}
by = adata.obs[by]
agg_idx = pd.Index(by.cat.categories) \
if pd.api.types.is_categorical_dtype(by) \
else pd.Index(np.unique(by))
agg_sum = scipy.sparse.coo_matrix((
np.ones(adata.shape[0]), (
agg_idx.get_indexer(by),
np.arange(adata.shape[0])
)
)).tocsr()
agg_mean = agg_sum.multiply(1 / agg_sum.sum(axis=1))
agg_method = {
"sum": lambda x: agg_sum @ x,
"mean": lambda x: agg_mean @ x,
"majority": lambda x: pd.crosstab(by, x).idxmax(axis=1).loc[agg_idx].to_numpy()
}
X = agg_method[X_agg](adata.X) if X_agg and adata.X is not None else None
obs = pd.DataFrame({
k: agg_method[v](adata.obs[k])
for k, v in obs_agg.items()
}, index=agg_idx.astype(str))
obsm = {
k: agg_method[v](adata.obsm[k])
for k, v in obsm_agg.items()
}
layers = {
k: agg_method[v](adata.layers[k])
for k, v in layers_agg.items()
}
for c in obs:
if pd.api.types.is_categorical_dtype(adata.obs[c]):
obs[c] = pd.Categorical(obs[c], categories=adata.obs[c].cat.categories)
return AnnData(
X=X, obs=obs, var=adata.var,
obsm=obsm, varm=adata.varm, layers=layers,
dtype=None if X is None else X.dtype
)
[文档]def transfer_labels(
ref: AnnData, query: AnnData, field: str,
n_neighbors: int = 30, use_rep: Optional[str] = None,
key_added: Optional[str] = None, **kwargs
) -> None:
r"""
Transfer discrete labels from reference dataset to query dataset
Parameters
----------
ref
Reference dataset
query
Query dataset
field
Field to be transferred in ``ref.obs`` (must be discrete)
n_neighbors
Number of nearest neighbors used for label transfer
use_rep
Data representation based on which to find nearest neighbors,
by default uses ``{ref, query}.X``.
key_added
New ``query.obs`` key added for the transfered labels,
by default the same as ``field``.
**kwargs
Additional keyword arguments are passed to
:class:`sklearn.neighbors.NearestNeighbors`
Note
----
First, nearest neighbors between reference and query cells are searched and
weighted by Jaccard index of SNN (shared nearest neighbors). The Jaccard
indices are then normalized per query cell to form a mapping matrix. To
obtain predictions for query cells, we multiply the above mapping matrix to
the one-hot matrix of reference labels. The category with the highest score
is taken as the final prediction, while its score is interpreted as
transfer confidence (stored as "{key_added}_confidence" in ``query.obs``).
"""
xrep = ref.obsm[use_rep] if use_rep else ref.X
yrep = query.obsm[use_rep] if use_rep else query.X
xnn = sklearn.neighbors.NearestNeighbors(
n_neighbors=n_neighbors, **kwargs
).fit(xrep)
ynn = sklearn.neighbors.NearestNeighbors(
n_neighbors=n_neighbors, **kwargs
).fit(yrep)
xx = xnn.kneighbors_graph(xrep)
xy = ynn.kneighbors_graph(xrep)
yx = xnn.kneighbors_graph(yrep)
yy = ynn.kneighbors_graph(yrep)
jaccard = (xx @ yx.T) + (xy @ yy.T)
jaccard.data /= 4 * n_neighbors - jaccard.data
normalized_jaccard = jaccard.multiply(1 / jaccard.sum(axis=0))
onehot = sklearn.preprocessing.OneHotEncoder()
xtab = onehot.fit_transform(ref.obs[[field]])
ytab = normalized_jaccard.T @ xtab
pred = pd.Series(
onehot.categories_[0][ytab.argmax(axis=1).A1],
index=query.obs_names, dtype=ref.obs[field].dtype
)
conf = pd.Series(
ytab.max(axis=1).toarray().ravel(),
index=query.obs_names
)
key_added = key_added or field
query.obs[key_added] = pred
query.obs[key_added + "_confidence"] = conf
[文档]def bedmap2anndata(
bedmap: os.PathLike, var_col: int = 3, obs_col: int = 6
) -> AnnData:
r"""
Convert bedmap result to :class:`anndata.AnnData` object
Parameters
----------
bedmap
Path to bedmap result
var_col
Variable column (0-based)
obs_col
Observation column (0-based)
Returns
-------
adata
Converted :class:`anndata.AnnData` object
Note
----
Similar to ``rliger::makeFeatureMatrix``,
but more automated and memory efficient.
"""
bedmap = pd.read_table(bedmap, sep="\t", header=None, usecols=[var_col, obs_col])
var_names = pd.Index(sorted(set(bedmap[var_col])))
bedmap = bedmap.dropna()
var_pool = bedmap[var_col]
obs_pool = bedmap[obs_col].str.split(";")
obs_names = pd.Index(sorted(set(chain.from_iterable(obs_pool))))
X = scipy.sparse.lil_matrix((var_names.size, obs_names.size)) # Transposed
for obs, var in tqdm(zip(obs_pool, var_pool), total=bedmap.shape[0], desc="bedmap2anndata"):
row = obs_names.get_indexer(obs)
col = var_names.get_loc(var)
X.rows[col] += row.tolist()
X.data[col] += [1] * row.size
X = X.tocsc().T # Transpose back
X.sum_duplicates()
return AnnData(
X=X, obs=pd.DataFrame(index=obs_names),
var=pd.DataFrame(index=var_names),
dtype=X.dtype
)
[文档]@logged
def estimate_balancing_weight(
*adatas: AnnData, use_rep: str = None, use_batch: Optional[str] = None,
resolution: float = 1.0, cutoff: float = 0.5, power: float = 4.0,
key_added: str = "balancing_weight"
) -> None:
r"""
Estimate balancing weights in an unsupervised manner
Parameters
----------
*adatas
Datasets to be balanced
use_rep
Data representation based on which to match clusters
use_batch
Estimate balancing per batch
(batch keys and categories must match across all datasets)
resolution
Leiden clustering resolution
cutoff
Cosine similarity cutoff
power
Cosine similarity power (for increasing contrast)
key_added
New ``obs`` key added for the balancing weight
Note
----
While the joint similarity array would have a size of :math:`K^n`
(where :math:`K` is the average number of clusters per dataset,
and :math:`n` is the number of datasets), a sparse implementation
was used, so the scalability regarding dataset number should be good.
"""
if use_batch: # Recurse per batch
estimate_balancing_weight.logger.info("Splitting batches...")
adatas_per_batch = defaultdict(list)
for adata in adatas:
groupby = adata.obs.groupby(use_batch, dropna=False)
for b, idx in groupby.indices.items():
adata_sub = adata[idx]
adatas_per_batch[b].append(AnnData(
obs=adata_sub.obs,
obsm={use_rep: adata_sub.obsm[use_rep]}
))
if len(set(len(items) for items in adatas_per_batch.values())) != 1:
raise ValueError("Batches must match across datasets!")
for b, items in adatas_per_batch.items():
estimate_balancing_weight.logger.info("Processing batch %s...", b)
estimate_balancing_weight(
*items, use_rep=use_rep, use_batch=None,
resolution=resolution, cutoff=cutoff,
power=power, key_added=key_added
)
estimate_balancing_weight.logger.info("Collating batches...")
collates = [
pd.concat([item.obs[key_added] for item in items])
for items in zip(*adatas_per_batch.values())
]
for adata, collate in zip(adatas, collates):
adata.obs[key_added] = collate.loc[adata.obs_names]
return
if use_rep is None:
raise ValueError("Missing required argument `use_rep`!")
adatas_ = [
AnnData(
obs=adata.obs.copy(deep=False).assign(n=1),
obsm={use_rep: adata.obsm[use_rep]}
) for adata in adatas
] # Avoid unwanted updates to the input objects
estimate_balancing_weight.logger.info("Clustering cells...")
for adata_ in adatas_:
sc.pp.neighbors(
adata_, n_pcs=adata_.obsm[use_rep].shape[1],
use_rep=use_rep, metric="cosine"
)
sc.tl.leiden(adata_, resolution=resolution)
leidens = [
aggregate_obs(
adata, by="leiden", X_agg=None,
obs_agg={"n": "sum"}, obsm_agg={use_rep: "mean"}
) for adata in adatas_
]
us = [normalize(leiden.obsm[use_rep], norm="l2") for leiden in leidens]
ns = [leiden.obs["n"] for leiden in leidens]
estimate_balancing_weight.logger.info("Matching clusters...")
cosines = []
for i, ui in enumerate(us):
for j, uj in enumerate(us[i + 1:], start=i + 1):
cosine = ui @ uj.T
cosine[cosine < cutoff] = 0
cosine = COO.from_numpy(cosine)
cosine = np.power(cosine, power)
key = tuple(
slice(None) if k in (i, j) else np.newaxis
for k in range(len(us))
) # To align axes
cosines.append(cosine[key])
joint_cosine = num.prod(cosines)
estimate_balancing_weight.logger.info(
"Matching array shape = %s...", str(joint_cosine.shape)
)
estimate_balancing_weight.logger.info("Estimating balancing weight...")
for i, (adata, adata_, leiden, n) in enumerate(zip(adatas, adatas_, leidens, ns)):
balancing = joint_cosine.sum(axis=tuple(
k for k in range(joint_cosine.ndim) if k != i
)).todense() / n
balancing = pd.Series(balancing, index=leiden.obs_names)
balancing = balancing.loc[adata_.obs["leiden"]].to_numpy()
balancing /= balancing.sum() / balancing.size
adata.obs[key_added] = balancing
def _metacell_corr(
*adatas: AnnData, skeleton: nx.Graph = None, method: str = "spr",
prep_fns: Optional[List[Optional[Callable[[AnnData], None]]]] = None
) -> nx.Graph:
if skeleton is None:
raise ValueError("Missing required argument `skeleton`!")
if set.intersection(*(set(adata.var_names) for adata in adatas)):
raise ValueError("Overlapping features are currently not supported!")
prep_fns = prep_fns or [None] * len(adatas)
if not len(prep_fns) == len(adatas):
raise ValueError("Length of `prep_fns` must match the number of datasets!")
for adata, prep_fn in zip(adatas, prep_fns):
if prep_fn:
prep_fn(adata)
adata = ad.concat(adatas, axis=1)
edgelist = nx.to_pandas_edgelist(skeleton)
source = adata.var_names.get_indexer(edgelist["source"])
target = adata.var_names.get_indexer(edgelist["target"])
X = num.densify(adata.X.T)
if method == "spr":
X = np.array([scipy.stats.rankdata(x) for x in X])
elif method != "pcc":
raise ValueError(f"Unrecognized method: {method}!")
mean = X.mean(axis=1)
meansq = np.square(X).mean(axis=1)
std = np.sqrt(meansq - np.square(mean))
edgelist["corr"] = np.array([
((X[s] * X[t]).mean() - mean[s] * mean[t]) / (std[s] * std[t])
for s, t in zip(source, target)
])
return nx.from_pandas_edgelist(edgelist, edge_attr=True, create_using=type(skeleton))
def _metacell_regr(
*adatas: AnnData, skeleton: nx.DiGraph = None,
model: str = "Lasso", **kwargs
) -> nx.DiGraph:
if skeleton is None:
raise ValueError("Missing required argument `skeleton`!")
for adata in adatas:
sc.pp.normalize_total(adata)
sc.pp.log1p(adata)
if set.intersection(*(set(adata.var_names) for adata in adatas)):
raise ValueError("Overlapping features are currently not supported!")
adata = ad.concat(adatas, axis=1)
targets = [node for node, in_degree in skeleton.in_degree() if in_degree]
biadj = biadjacency_matrix(
skeleton, adata.var_names, targets, weight=None
).astype(bool).T.tocsr()
X = num.densify(adata.X)
Y = num.densify(adata[:, targets].X.T)
coef = []
model = getattr(sklearn.linear_model, model)
for target, y, mask in tqdm(zip(targets, Y, biadj), total=len(targets), desc="metacell_regr"):
X_ = X[:, mask.indices]
lm = model(**kwargs).fit(X_, y)
coef.append(pd.DataFrame({
"source": adata.var_names[mask.indices],
"target": target,
"regr": lm.coef_
}))
coef = pd.concat(coef)
return nx.from_pandas_edgelist(coef, edge_attr=True, create_using=type(skeleton))