Source code for MidlineIdentifier.adata_class

import scanpy as sc
from anndata import AnnData

import numpy as np
import pandas as pd


import matplotlib.pyplot as plt
from adjustText import adjust_text

import os
import logging

from . import utilis

[docs] class Adata: """ Adata object to wrap anndata Attributes ---------- adata : :class:`anndata.AnnData` anndata object to store the single cell data. Compatible with all scanpy functions. outdir : :class:`str` output directory to save files Methods ------- Preprocessing EnrichBins FindOrientation FindDEG FindSVG """ def __init__(self, fad, sample, outdir): """ Constructs all the necessary attributes for the :class:`~MidlineIdentifier.adata_class.Adata`. Parameters ---------- fad : :class:`str` Path to single cell object. Will be load into an anndata object. sample : :class:`str` Sample identifier. Will be store in ``adata.obs['sample']``. Useful when concanating multiple :class:`~MidlineIdentifier.adata_class.Adata` instances. outdir : :class:`str` Output directory to save files """ self.adata = sc.read_h5ad(fad) self.adata.obs['sample'] = sample self.outdir = outdir
[docs] def Preprocessing(self): """ Preprocessing of single cell dataset. This function calls :func:`scanpy.pp.normalize_total` and :func:`scanpy.pp.log1p`. Raw data, normalized data and log data will be stored into ``.layers['counts']``, ``.layers["norm_counts"]`` and ``.layers["lognorm_counts"]`` respectively. """ adata = self.adata if adata.raw is None and isinstance(adata.X.max(), (int, np.integer)) : adata = adata[:, adata.var_names != 'nan'] if adata.n_vars == 0: logging.warning('No detected genes in the adata.') adata.raw = adata adata.layers["counts"] = adata.X.copy() # storing raw counts in layers sc.pp.normalize_total(adata) adata.layers["norm_counts"] = adata.X.copy() sc.pp.log1p(adata) adata.layers["lognorm_counts"] = adata.X.copy() self.adata = adata
[docs] def EnrichBins(self, genes, coords, nbin = 4, score_name = 'score', **kwargs): """ Perform gene set enrichment in bins. This function calls :func:`scanpy.tl.score_genes`. Result will be stored into ``.uns[score_name]``. Parameters ---------- genes : :class:`list` | :class:`str` The list of gene names used for score calculation coords : :class:`str` The key of the observations to consider. Must be one of the ``.obs.columns`` nbin : :class:`int` The number of bins score_name : :class:`str` Name of the field to be added in ``.uns`` kwargs Additonal arguments to pass to :func:`scanpy.tl.score_genes` """ adata = self.adata genes = set(genes) & set(adata.var_names) logging.info(', '.join(genes) + 'are used to calculate enrichment.') cut_index = pd.cut(adata.obs[coords], nbin, labels = False, duplicates='drop', retbins=False) adata.obs['bins'] = cut_index out = utilis.grouped_obs(adata, 'bins', method = 'mean') adata_bins = sc.AnnData(out.transpose(), out.columns.to_frame(), out.index.to_frame()) try: sc.tl.score_genes(adata_bins, genes, n_bins = 25, **kwargs) score = adata_bins.obs['score'].tolist() except IndexError: sc.tl.score_genes(adata_bins, genes, n_bins = 10, **kwargs) score = adata_bins.obs['score'].tolist() except ValueError: score = None adata.uns[score_name] = score self.adata = adata
[docs] def FindOrientation(self, coords = 'major_coor_scaled', start_genes = ['Sox9','Acan','Col2a1','Col9a1','Col9a2','Col11a1'], end_genes = ['Col1a1', 'Col3a1'], plot = True, **kwargs): """ Orient the coords based on the provided genelists. This allows cross-dataset/structure comparisons. Result will be stored into ``.uns[start_score]`` and ``.uns[end_score]``. Parameters ---------- coords : :class:`str` The key of the observations to consider. Must be one of the ``.obs.columns`` start_genes : :class:`list` The list of gene names used to calculate the start end_genes : :class:`list` The list of gene names used to calculate the end plot : :class:`bool` (default: `True`) Set to ``True`` by default. If True, save teh plot into ``'Orientation_score.pdf'`` in the output directory (`.outdir`) kwargs Additonal arguments to pass to :func:`~MidlineIdentifier.adata_class.Adata.EnrichBins` """ self.EnrichBins(start_genes, coords, score_name = 'start_score', **kwargs) self.EnrichBins(end_genes, coords, score_name = 'end_score', **kwargs) adata = self.adata ss, es = adata.uns['start_score'], adata.uns['end_score'] if ss[0] < ss[-1]: # reverse the path # self.start, self.end = self.end, self.start adata.obs['major_coor_used'] = 1 - adata.obs[coords] else: adata.obs['major_coor_used'] = adata.obs[coords] # max_s, max_e = max(ss), max(es) # if max_s > Thre and max_e > Thre: # adata.obs['loc'] = 'P' # adata.obs.loc[adata.obs['major_coor_used'] > 0.5, 'loc'] = 'D' # else: # adata.obs['loc'] = 'A' self.adata = adata if plot: fig, ax = plt.subplots(1, 2, figsize = (8, 4)) if ss is not None: ax[0].scatter(range(len(ss)), ss) ax[0].axhline(0, color = 'r', linestyle = 'dashed') ax[0].set_title('Start score') if es is not None: ax[1].scatter(range(len(es)), es) ax[1].axhline(0, color = 'r', linestyle = 'dashed') ax[1].set_title('End score') plt.savefig(os.path.join(self.outdir, 'Orientation_score.pdf'))
[docs] def FindDEG(self, groupby, condition, method, **kwargs): """ Finds differentially expressed genes (DEGs) for each of the identity classes in a dataset. Parameters ---------- adata : :class:`anndata.AnnData` Annotated data matrix. groupby : :class:`str` The key of the observations grouping to consider. method : :class:`str` (`default: 'DESeq2'`) Method used to calcualte DEGs. ``'DESeq2'`` and ``'DESeq2_pb'`` use `pydeseq2 <https://github.com/owkin/PyDESeq2?tab=readme-ov-file>`_, the python implementation of the `DESeq2 <https://genomebiology.biomedcentral.com/articles/10.1186/s13059-014-0550-8>`_ method. ``DESeq2`` calculates DEGs on single cell level while ``DESeq2_pb`` generate pseudobulk expression based on ``condition``. ``'t-test'``, ``'t-test_overestim_var'`` overestimates variance of each group, ``'wilcoxon'`` uses Wilcoxon rank-sum, ``'logreg'`` uses logistic regression. If method is one of ``['logreg', 't-test', 'wilcoxon', 't-test_overestim_var']``, This function directly calls :func:`scanpy.tl.rank_genes_groups`. kwargs Additonal arguments to pass to :func:`scanpy.tl.rank_genes_groups` Returns ------- :class:`pandas.DataFrame` """ # groups = [ref], reference = g, if method.startswith("DESeq2"): if method == "DESeq2_pb": groupby_ = condition else: groupby_ = groupby all_ = self.adata.obs[groupby_].unique().tolist() # check groups if 'groups' not in kwargs.keys(): groups = all_ else: groups = kwargs['groups'] if isinstance(groups, (str, int)): groups = [groups] # check reference if 'reference' not in kwargs.keys(): reference = all_ else: reference = kwargs['reference'] if isinstance(reference, str): reference = [reference] diff = set(reference) - set(all_) if len(diff) > 0: raise ValueError( f"reference = {diff} is not one of groupby = {all_}.") logging.info("Accessing adata.layers['counts'] to get the raw counts") adata_comp = self.adata[self.adata.obs[groupby_].isin(set(groups + reference))].copy() if method == "DESeq2_pb": df = utilis.grouped_obs(adata_comp, groupby = groupby, method = 'sum', layer = 'counts') g2c = dict(zip(adata_comp.obs[groupby],adata_comp.obs[condition])) meta = np.array([g2c.get(g) for g in df.columns]) res_list = [] for group_i in groups: reference_i = list(set(reference)-set(group_i)) if len(reference_i) == 0: logging.warning('skipping comparison') continue col1_idx = np.where(meta == group_i)[0] col2_idx = np.where(meta == reference_i)[0] res_tmp = _deseq2(df, col1_idx, col2_idx) res_tmp.insert(0, 'group', [group_i] * len(res_tmp)) res_list.append(res_tmp) res = pd.concat(res_list, axis = 0) else: # TODO: DESeq2 on single cell, very slow res_list = [] for group_i in groups: reference_i = list(set(reference)-set(group_i)) if len(reference_i) == 0: logging.warning('skipping comparison') continue cells1_idx = np.where(adata_comp.obs[groupby] == group_i)[0] cells2_idx = np.where(adata_comp.obs[groupby].isin(reference_i))[0] res_list.append(_deseq2(adata_comp, cells1_idx, cells2_idx)) res = pd.concat(res_list, axis = 1) else: sc.tl.rank_genes_groups(self.adata, groupby = groupby, method=method, **kwargs) # All groups are returned if groups is None res = sc.get.rank_genes_groups_df(self.adata, group = None) # add group column to res if len(groups) == 1 if 'group' not in res.columns: g = list(self.adata.uns['rank_genes_groups']["names"].dtype.names) res.insert(0, 'group', g * len(res)) return res
[docs] def FindSVG(self, coords, sample, layer = 'counts', min_exp_gene = 0, min_exp_cell = 0): """ Finds spatially variable genes (SVGs) for each of the identity classes in a dataset. This function incoporate :func:`SpatialDE`. Raw counts should be used. Parameters ---------- sample : :class:`str` (default: `'sample'`) Sample identifier. Must be one of the `.obs.columns` coords : :class:`str` Spatial coordinates for each cell. Can be one of the ``.obs.columns`` or a :class:`pandas.DataFrame` with rows as cells and columns as spatial dimensions. layer : :class:`str` (default: `'counts'`) Key from `adata.layers` whose value will be used to. If None, .`adata.layers['counts']` will be used. min_exp_gene : :class:`int` (default: `'0'`) Filter genes whose expression lower than this min_exp_cell : :class:`int` (default: `'0'`) Filter cells whose total expression lower than this """ if isinstance(coords, str) and (coords in self.adata.obs.columns): coords = self.adata.obs[coords] elif isinstance(coords, pd.DataFrame): if len(coords) != self.adata.n_obs: raise ValueError(f"The provided coords must have the same size as the number of testing cells {self.adata.n_obs}.") else: raise ValueError(f"The provided coords should be either one columns in adata.obs or a pandas.DataFrame.") # print(coords[counts.index]) sample_unqiue = self.adata.obs[sample].unique() for s in sample_unqiue: adata = self.adata[self.adata.obs[sample] == s] counts = pd.DataFrame(adata.layers[layer], columns=adata.var_names, index=adata.obs_names) # Filter 'nan' genes counts = counts.loc[:,counts.columns != 'nan'] # Filter practically unobserved genes counts = counts.loc[:,counts.sum(0) >= min_exp_gene] counts = counts.loc[counts.sum(1) >= min_exp_cell,: ] sample_info = pd.DataFrame({ 'coords': coords.loc[counts.index], 'total_counts':counts.sum(1)}, index = counts.index) _spatialde(counts, sample_info, self.outdir, s)
[docs] def PolarizationScoring(self, genes, norm = False, coords = 'major_coor_used', bootstrapping = False, n_bs = 1000, random_state = 1234): """ Calculate the polarization score for genes. Parameters ---------- genes : :class:`list` | :class:`str` genes to calculate bootstrapping : :class:`bool` (default : `False`) Whether to bootstrapping or not n_bs : :class:`int` (default : `100`) The number of bootstrapping to perform random_state : :class:`int` (default : `1234`) Set random seed for reproducibility """ adata = self.adata.adata if g not in adata.var_names: logging.warning("The requested gene %s is not in adata! Returning None", g) return None aggr_ = utilis.get_gene_expr(adata, g, norm = norm, coords = coords) if bootstrapping: np.random.seed(random_state) score = [] for _ in range(n_bs): idx = np.random.choice(aggr_.index, size = len(aggr_), replace = True) df = aggr_.iloc[idx,:] score.append(_PolarizationScoring(df)) # score = np.array(score) / (10 * np.std(score)) bs = bootstrap((score, ), np.median, confidence_level = 0.95,random_state = random_state, method = 'percentile') CI = bs.confidence_interval return (score, CI) else: return _PolarizationScoring(aggr_)
from pydeseq2.dds import DeseqDataSet from pydeseq2.default_inference import DefaultInference from pydeseq2.ds import DeseqStats # gene_mat = dr_utilis.grouped_obs_single(adata, groupby = 'sample', use_raw = True, method = 'sum') # raw counts # gene_mat_PD = dr_utilis.grouped_obs_single(adata_PD, groupby = 'comp', use_raw = True, method = 'sum') # raw counts # gene_mat_P = gene_mat_PD.iloc[:,gene_mat_PD.columns.str.endswith('P')] # gene_mat_D = gene_mat_PD.iloc[:,gene_mat_PD.columns.str.endswith('D')] def _deseq2(data, cells1, cells2, **kwagrs): """ Finds differentially expressed genes (DEGs) using DESeq2 for one comparison. Parameters ---------- data : :class:`anndata.AnnData` | :class:`pandas.DataFrame` Raw expression counts. If :class:`anndata.AnnData`, `.layers["counts"]` will be used. If :class:`pandas.DataFrame`, genes as rows and cells/samples as columns cells1 : :class:`list` Cells index for condition 1 cells2 : :class:`list` Cells index for condition 2 (reference) **kwagrs Additonal arguments to pass to :func:`pydeseq2.dds.DeseqDataSet` Returns ------- :class:`pandas.DataFrame` """ if isinstance(data, AnnData): df = np.concatenate([data.layers["counts"][cells1], data.layers["counts"][cells2]], axis = 0) meta = pd.DataFrame({'comp': ['G1']* len(cells1) + ['G2']* len(cells2)}, index = df.index) elif isinstance(data, pd.DataFrame): df = pd.concat([data.iloc[:,cells1], data.iloc[:,cells2]], axis = 1).transpose() meta = pd.DataFrame({'comp': ['G1']* len(cells1) + ['G2']* len(cells2)}, index = df.index) else: raise ValueError('Unrecognized input format.') # print(df) # print(meta) dds = DeseqDataSet( counts=df, metadata=meta, design_factors="comp", ref_level = ['comp', 'G2'], refit_cooks=False, inference=DefaultInference(n_cpus=1), **kwagrs ) dds.deseq2() stat_res = DeseqStats(dds) stat_res.summary() res = stat_res.results_df.sort_values('padj') return res import NaiveDE import SpatialDE import SpatialDE.plot def _spatialde(counts, sample_info, outdir, sample = 'sample', cal_pattern = True, random_state = 1234): """ Finds spatially variable genes (SVGs) for one sample. THis function will save two / four files in the specified folder - '1.FSV_sig_' + s + '.pdf': Plot of Fraction Spatial Variance vs Q-value - '2.SVG_filter_' + s + '.csv': Statistics of significant SVG. See `here <https://github.com/Teichlab/SpatialDE>`_ for more detailed documentatiton. If `cal_pattern = True`: - '3.Pattern_' + s + '.pdf': Spatial patterns of gene expression - '4.Pattern_members_' + s + '.csv': Gene membership of spatial patterns Parameters ---------- counts : :class:`pandas.DataFrame` Raw expression counts. Genes as columns and cells/samples as rows. outdir : :class:`str` Output directory sample: :class:`str` (default: `'sample'`) Sample identifier. Will be used as suffix of the output file. cal_pattern: :class:`bool` (default: `'True'`) If True, to detect the spatial patterns of gene expression and assign genes to patterns. This callls the :func:`SpatialDE.spatial_patterns` random_state : :class:`int` (default : `'1234'`) Set random seed for reproducibility """ np.random.seed(random_state) counts_n = NaiveDE.stabilize(counts.T).T counts_r = NaiveDE.regress_out(sample_info, counts_n.T, 'np.log(total_counts)').T X = sample_info['coords'].values[:, None] # do not change. it's here for a reason results = SpatialDE.run(X, counts_r) results['pval'] = results['pval'].clip(lower = results.query('pval > 0')['pval'].min() / 2) results['qval'] = results['qval'].clip(lower = results.query('qval > 0')['qval'].min() / 2) plt.clf() plt.figure(figsize=(10,6)) SpatialDE.plot.FSV_sig(results) text_list = [] for i in range(results.shape[0]): if results.qval[i] < 0.05: text_list.append(plt.text(results.FSV[i], results.pval[i], results.g[i], fontsize = 12)) if len(text_list) >0: adjust_text(text_list, arrowprops=dict(arrowstyle="-", color='k', lw=0.5)) plt.title(sample) plt.tight_layout() fn = '1.FSV_sig_' + sample + '.pdf' plt.savefig(os.path.join(outdir, fn)) plt.clf() sig_res = results.query('qval < 0.05').copy() sig_res['sample'] = sample fn = '2.SVG_filter_' + sample + '.csv' if not cal_pattern: sig_res.to_csv(os.path.join(outdir, fn),index = False) else: logging.info('Pattern calculation...') pattern_results, patterns = SpatialDE.spatial_patterns(X, counts_n, sig_res, C = 4, l = 0.2, verbosity=1) comb_results = sig_res.join(pattern_results.set_index('g'), on='g') comb_results.to_csv(os.path.join(outdir, fn),index = False) fig, ax = plt.subplots(2, 2, figsize = (6, 6)) axs = ax.ravel() for i, p in enumerate(patterns.columns): C = patterns[p] axs[i].scatter(X, C, s = 10) axs[i].vlines(0.5, min(C), max(C), linestyles = 'dashed', colors = 'r') axs[i].set_yticks([]) axs[i].set_title('Pattern {} - {} genes'.format(i, pattern_results.query('pattern == @i').shape[0])) plt.tight_layout() fn = '3.Pattern_' + sample + '.pdf' plt.savefig(os.path.join(outdir, fn)) fn = '4.Pattern_members_' + sample + '.csv' pattern_results.sort_values('pattern').to_csv(os.path.join(outdir, fn), index = False) def _PolarizationScoring(aggr_): m = np.mean(aggr_['expr']) s = np.sum(aggr_['expr'] > 0) score = np.sum((aggr_['coords']-0.5) * aggr_['expr']) scale1 = np.sum((aggr_['expr'] - m)**2) if scale1 == 0: scale1 = 1 scale = scale1 * np.sum((aggr_['coords'] - 0.5)**2) return s * score / scale