Source code for lXtractor.ext.hmm

"""
Wrappers around PyHMMer for convenient annotation of domains and families.
"""
from __future__ import annotations

import gzip
import logging
import typing as t
from collections import abc
from itertools import count
from pathlib import Path
from shutil import rmtree

import pandas as pd
from more_itertools import peekable, split_at
from pyhmmer.easel import (
    DigitalSequence,
    TextSequence,
    DigitalSequenceBlock,
    TextMSA,
    DigitalMSA,
    Alphabet,
)
from pyhmmer.plan7 import (
    HMM,
    HMMFile,
    Pipeline,
    TopHits,
    Alignment,
    Domain,
    TraceAligner,
    Builder,
    Background,
)

from lXtractor.chain import ChainSequence, ChainStructure, Chain
from lXtractor.core import Alignment as lXAlignment
from lXtractor.core.base import AbstractResource
from lXtractor.core.exceptions import MissingData
from lXtractor.util import fetch_to_file

LOGGER = logging.getLogger(__name__)

HMM_DEFAULT_NAME = "HMM"
PFAM_HMM_NAME = "Pfam-A.hmm.gz"
PFAM_DAT_NAME = "Pfam-A.hmm.dat.gz"
PFAM_HMM_URL = "https://ftp.ebi.ac.uk/pub/databases/Pfam/current_release/Pfam-A.hmm.gz"
PFAM_DAT_URL = (
    "https://ftp.ebi.ac.uk/pub/databases/Pfam/current_release/Pfam-A.hmm.dat.gz"
)
PFAM_LOC = Path(__file__).parent.parent / "resources" / "Pfam"
_ChainT: t.TypeAlias = Chain | ChainStructure | ChainSequence
_SeqT: t.TypeAlias = _ChainT | str | tuple[str, str] | DigitalSequence
_HmmInpT: t.TypeAlias = HMM | HMMFile | Path | str
CT = t.TypeVar("CT", bound=t.Union[ChainSequence, ChainStructure, Chain])


# TODO: verify non-domain HMM types (e.g., Motif, Family) yield valid Domain hits
[docs] def iter_hmm(hmm: _HmmInpT) -> abc.Generator[HMM]: """ Iterate over HMM models. :param hmm: A path to an HMM file, opened ``HMMFile`` or a stream. :return: An iterator over individual HMM models. """ match hmm: case HMMFile(): yield from hmm case HMM(): yield hmm case _: with HMMFile(hmm) as f: yield from f
def _enumerate_numbering( a: Alignment, ) -> abc.Generator[tuple[int | None, int | None], None, None]: hmm_pool, seq_pool = count(a.hmm_from), count(a.target_from) for hmm_c, seq_c in zip(a.hmm_sequence, a.target_sequence): hmm_i = None if hmm_c == "." else next(hmm_pool) seq_i = None if seq_c == "-" else next(seq_pool) yield seq_i, hmm_i def _get_alphabet(alphabet: Alphabet | str): if isinstance(alphabet, str): if alphabet.lower() == "amino": alphabet = Alphabet.amino() elif alphabet.lower() == "dna": alphabet = Alphabet.dna() elif alphabet.lower() == "rna": alphabet = Alphabet.rna() else: raise ValueError(f"Invalid alphabet type {alphabet}.") return alphabet
[docs] def digitize_seq(obj: t.Any, alphabet: Alphabet | str = "amino") -> DigitalSequence: """ :param obj: A `Chain*`-type object or string or a tuple of (name, _seq). A sequence of this object must be compatible with the alphabet of the HMM model. :param alphabet: An alphabet type the sequence corresponds to. Can be an initialized PyHMMer alphabet or a string "amino", "dna", or "rna". :return: A digitized sequence compatible with PyHMMer. """ alphabet = _get_alphabet(alphabet) match obj: case DigitalSequence(): return obj case TextSequence(): return obj.digitize(alphabet) case str(): _id = str(hash(obj)) accession, name, text = _id, _id, obj case [str(), str()]: accession, name, text = obj[0], obj[0], obj[1] case ChainSequence(): accession, name, text = obj.id, obj.name, obj.seq1 case ChainStructure() | Chain(): accession, name, text = obj.id, obj.id, obj.seq.seq1 case _: raise TypeError(f"Unsupported sequence type {type(obj)}") return TextSequence( sequence=text, name=bytes(name, encoding="utf-8"), accession=bytes(accession, encoding="utf-8"), ).digitize(alphabet)
[docs] class PyHMMer: """ A basis pyhmmer interface aimed at domain extraction. It works with a single hmm model and pipeline instance. `The original documentation <https://pyhmmer.readthedocs.io/en/stable/>`. """
[docs] def __init__(self, hmm: _HmmInpT, **kwargs): """ :param hmm: An :class:`HMMFile` handle or path as string or `Path` object to a file containing a single HMM model. In case of multiple models, only the first one will be taken :param kwargs: Passed to :class:`Pipeline`. The `alphabet` argument is derived from the supplied `hmm`. """ try: #: HMM instance self.hmm = next(iter_hmm(hmm)) except (StopIteration, EOFError) as e: raise MissingData(f"Invalid input {hmm}") from e #: Pipeline to use for HMM searches self.pipeline: Pipeline = self.init_pipeline(**kwargs) #: Hits resulting from the most recent HMM search self.hits_: TopHits | None = None
[docs] @classmethod def from_hmm_collection(cls, hmm: _HmmInpT, **kwargs) -> abc.Generator[t.Self]: """ Split HMM collection and initialize a :class:`PyHMMer` instance from each HMM model. :param hmm: A path to HMM file, opened HMMFile handle, or parsed HMM. :param kwargs: Passed to the class constructor. :return: A generator over :class:`PyHMMer` instances created from the provided HMM models. """ yield from (cls(inp, **kwargs) for inp in iter_hmm(hmm))
[docs] @classmethod def from_msa( cls, msa: abc.Iterable[tuple[str, str] | str | _ChainT] | lXAlignment, name: str | bytes, alphabet: Alphabet | str, **kwargs, ) -> t.Self: """ Create a :class:`PyHMMer` instance from a multiple sequence alignment. :param msa: An iterable over sequences. :param name: The HMM model's name. :param alphabet: An alphabet to use to build the HMM model. See :func:`digitize_seq` for available options. :param kwargs: Passed to :class:`DigitalMSA` of ``PyHMMer`` that serves as the basis for creating an HMM model. :return: A new :class:`PyHMMer` instance initialized with the HMM model built here. """ if isinstance(name, str): name = bytes(name, "utf-8") alphabet = _get_alphabet(alphabet) msa_d = DigitalMSA( alphabet, name, sequences=list(map(digitize_seq, msa)), **kwargs, ) builder = Builder(alphabet) background = Background(alphabet) hmm, _, _ = builder.build_msa(msa_d, background) return cls(hmm)
[docs] def init_pipeline(self, **kwargs) -> Pipeline: """ :param kwargs: Passed to :class:`Pipeline` during initialization. :return: Initialized pipeline, also saved to :attr:`pipeline`. """ self.pipeline = Pipeline(self.hmm.alphabet, **kwargs) return self.pipeline
[docs] def convert_seq(self, obj: t.Any) -> DigitalSequence: """ :param obj: A `Chain*`-type object or string or a tuple of (name, _seq). A sequence of this object must be compatible with the alphabet of the HMM model. :return: A digitized sequence compatible with PyHMMer. """ return digitize_seq(obj, self.hmm.alphabet)
def _convert_to_seq_block(self, seqs: abc.Iterable[_SeqT]) -> DigitalSequenceBlock: return DigitalSequenceBlock(self.hmm.alphabet, map(self.convert_seq, seqs))
[docs] def search(self, seqs: abc.Iterable[_SeqT]) -> TopHits: """ Run the :attr:`pipeline` to search for :attr:`hmm`. :param seqs: Iterable over digital sequences or objects accepted by :meth:`convert_seq`. :return: Top hits resulting from the search. """ if self.pipeline is None: self.init_pipeline() seqs_block = self._convert_to_seq_block(seqs) self.hits_ = self.pipeline.search_hmm(self.hmm, seqs_block) return self.hits_
[docs] def align(self, seqs: abc.Iterable[_SeqT]) -> TextMSA: """ Align sequences to a profile. :param seqs: Sequences to align. :return: :class:`TextMSA` with aligned sequences. """ block = self._convert_to_seq_block(seqs) aligner = TraceAligner() traces = aligner.compute_traces(self.hmm, block) msa = aligner.align_traces(self.hmm, block, traces) assert isinstance(msa, TextMSA), "Unexpected MSA type returned" return msa
[docs] def annotate( self, objs: abc.Iterable[_ChainT] | _ChainT, new_map_name: str | None = None, min_score: float | None = None, min_size: int | None = None, min_cov_hmm: float | None = None, min_cov_seq: float | None = None, domain_filter: abc.Callable[[Domain], bool] | None = None, **kwargs, ) -> abc.Generator[CT, None, None]: """ Annotate provided objects by hits resulting from the HMM search. An annotation is the creation of a child object via :meth:`spawn_child` method (e.g., :meth:`lXtractor.core.chain.ChainSequence.spawn_child`). :param objs: A single one or an iterable over `Chain*`-type objects. :param new_map_name: A name for a child :class:`ChainSequence <lXtractor.core.chain.ChainSequence` to hold the mapping to the hmm numbering. :param min_score: Min hit score. :param min_size: Min hit size. :param min_cov_hmm: Min HMM model coverage -- a fraction of mapped / total nodes. :param min_cov_seq: Min coverage of a sequence by the HMM model nodes -- a fraction of mapped nodes to the sequence's length. :param domain_filter: A callable to filter domain hits. :param kwargs: Passed to the `spawn_child` method. **Hint:** if you don't want to keep spawned children, pass ``keep=False`` here. :return: A generator over spawned children yielded sequentially for each input object and valid domain hit. """ def accept_domain(d: Domain, cov_hmm: float, cov_seq: float) -> bool: acc_cov_hmm = min_cov_hmm is None or cov_hmm >= min_cov_hmm acc_cov_seq = min_cov_seq is None or cov_seq >= min_cov_seq acc_score = min_score is None or d.score >= min_score acc_size = min_size is None or ( d.alignment.target_to - d.alignment.target_from >= min_size ) return all([acc_cov_hmm, acc_cov_seq, acc_score, acc_size]) if isinstance(objs, _ChainT): objs = [objs] else: peeking = peekable(objs) fst = peeking.peek(False) if not fst: raise MissingData("No sequences provided") if not isinstance(fst, _ChainT): raise TypeError(f"Unsupported type {fst}") if new_map_name is None: try: new_map_name = self.hmm.accession.decode("utf-8").split(".")[0] except (AttributeError, ValueError): try: new_map_name = ( self.hmm.name.decode("utf-8") .replace(" ", "_") .replace("-", "_") ) except ValueError: LOGGER.warning( "`new_map_name` was not provided, and neither `accession` " "nor `name` attribute exist: falling back to default " f"name: {HMM_DEFAULT_NAME}" ) new_map_name = HMM_DEFAULT_NAME objs_by_id: dict[str, CT] = {s.id: s for s in objs} self.search(objs_by_id.values()) for hit in self.hits_: obj = objs_by_id[hit.accession.decode("utf-8")] for dom_i, dom in enumerate(hit.domains, start=1): aln = dom.alignment # compute coverages # HMM: - 1 2 3 - 4 - 5 # SEQ: A - A A A A - A # HMM node numbers falling onto valid sequence elements # => - 2 3 - 4 5 num = [hmm_i for seq_i, hmm_i in _enumerate_numbering(aln) if seq_i] # n = the number of valid HMM nodes covered by _seq # => 2 3 4 5 => 4 n = sum(1 for x in num if x is not None) # SEQ coverage 4 / 6 cov_seq = n / len(num) # HMM coverage 4 / M cov_hmm = n / self.hmm.M if not accept_domain(dom, cov_hmm, cov_seq): continue if domain_filter and not domain_filter(dom): continue name = f"{new_map_name}_{dom_i}" offset = obj.start - 1 sub = obj.spawn_child( aln.target_from + offset, aln.target_to + offset, name, **kwargs, ) seq = sub.seq if isinstance(obj, (Chain, ChainStructure)) else sub seq.add_seq(new_map_name, num) seq.meta[f"{new_map_name}_pvalue"] = dom.pvalue seq.meta[f"{new_map_name}_score"] = dom.score seq.meta[f"{new_map_name}_bias"] = dom.bias seq.meta[f"{new_map_name}_cov_seq"] = str(cov_seq) seq.meta[f"{new_map_name}_cov_hmm"] = str(cov_hmm) yield sub
[docs] class Pfam(AbstractResource): """ A minimalistic Pfam interface. * :meth:`fetch` fetches Pfam raw HMM models and associated metadata. * :meth:`parse` prepares these data for later usage and stores to the filesystem. * :meth:`read` loads parsed files. Parsed Pfam data is represented as a Pandas DataFrame accessible via :meth:`df` with columns: "ID", "Accession", "Description", "Category", and "HMM". Each row corresponds to a single model from Pfam-A collection and associated metadata taken from the Pfam-A.dat file. HMM models are wrapped into a :class:`PyHMMer` instance. For quick access to a single HMM model parsed into :class:`PyHMMer`, use ``Pfam()[hmm_id]``. """
[docs] def __init__( self, resource_path: Path = PFAM_LOC, resource_name: str = "Pfam", ): super().__init__(resource_path, resource_name) self._df = None
@property def df(self) -> pd.DataFrame | None: """ :return: Parsed Pfam if :meth:`read` or :meth:`parse` were called. Otherwise, returns ``None``. """ return self._df @property def dat_columns(self) -> tuple[str, ...]: return "ID", "Accession", "Description", "Category" @df.setter def df(self, value: pd.DataFrame): self._validate_frame(value) self._df = value def __getitem__(self, item: str) -> PyHMMer: path = self.path / "parsed" / "hmm" / f"{item}.hmm.gz" try: return PyHMMer(path) except FileNotFoundError: raise KeyError(f"No HMM with ID {item} found in {path}")
[docs] def fetch( self, url_hmm: str = PFAM_HMM_URL, url_dat: str = PFAM_DAT_URL, ) -> tuple[Path, Path]: """ Fetch Pfam-A data from InterPro. :param url_hmm: URL to "Pfam-A.hmm.gz". :param url_dat: URL to "Pfam-A.hmm.dat.gz" :return: A pair of filepaths for fetched HMM and dat files. """ base = self.path / "raw" base.mkdir(exist_ok=True, parents=True) return ( fetch_to_file(url_hmm, base / PFAM_HMM_NAME), fetch_to_file(url_dat, base / PFAM_DAT_NAME), )
[docs] def parse( self, dump: bool = True, rm_raw: bool = True, ) -> pd.DataFrame: """ Parse fetched raw data into a single pandas :class:`DataFrame`. :param dump: Dump parsed files to :attr:`path` / "raw" dir. :param rm_raw: Clean up the raw data once parsing is done. :return: A parsed Pfam :class:`DataFrame`. See the class's docs for a list of columns. """ path_hmm = self.path / "raw" / PFAM_HMM_NAME path_dat = self.path / "raw" / PFAM_DAT_NAME if not path_hmm.exists(): raise FileNotFoundError(f"Missing raw HMM data in {path_hmm}") if not path_dat.exists(): raise FileNotFoundError(f"Missing raw dat file in {path_dat}") df_hmm = self._parse_hmm(path_hmm) df_dat = self._parse_dat(path_dat) self._df = df_dat.merge(df_hmm, on="Accession") if dump: self.dump() if rm_raw: self.clean(raw=True) return self._df
[docs] def read( self, path: Path | None = None, accessions: abc.Container[str] | None = None, categories: abc.Container[str] | None = None, hmm: bool = True, ) -> pd.DataFrame: """ Read parsed Pfam data. First it reads the "dat" file and filters to relevant accessions and/or categories. Then, if `hmm` is ``True``, it loads each model and wraps into an :class:`PyHMMer` instance. Otherwise, it loads the HMM metadata. One can explore and filter these data, then load the desired HMM models via :meth:`load_hmm`. :param path: A path to the dir with layout similar to what :meth:`dump` creates. :param accessions: A list of Pfam accessions following the ".", e.g., ``["PF00069", ]``. :param categories: A list of Pfam categories to filter the accessions to. :param hmm: Load HMM models. :return: A parsed Pfam :class:`DataFrame`. """ base = path or self.path / "parsed" df = pd.read_csv(base / "dat.csv") if accessions: df = df[df["Accession"].isin(accessions)] if categories: df = df[df["Category"].isin(categories)] if hmm: df = self.load_hmm(df) self._df = df return df
[docs] def load_hmm( self, df: pd.DataFrame | None = None, path: Path | None = None ) -> pd.DataFrame: """ Load HMM models according to accessions in passed `df` and create a column "PyHMMer" with loaded models. :param df: A :class:`DataFrame` having all the :meth:`dat_columns. :param path: A custom path to the parsed data with an "hmm" subdir. :return: A copy of the original :class:`DataFrame` with loaded models. """ base = path or self.path / "parsed" df = self._df if df is None else df if df is None: raise MissingData("No parsed data. Use `read` or `parse` first.") df = df.copy() df["PyHMMer"] = [ PyHMMer(base / "hmm" / f"{acc}.hmm.gz") for acc in df["Accession"] ] return df
[docs] def dump(self, path: Path | None = None) -> Path: """ Store parsed data to the filesystem. This function will store the HMM metadata to attr:`path` / "parsed" / "dat.csv" and separate gzip-compressed HMM models into :attr:`path` / "parsed" / "hmm". :param path: Use this path instead of the :attr:`path` as a base dir. :return: The path :attr:`path` / "parsed". """ base = path or self.path / "parsed" (base / "hmm").mkdir(exist_ok=True, parents=True) if self._df is None: raise MissingData("Missing parsed data to dump") for _, row in self._df.iterrows(): hmm_path = base / "hmm" / f"{row.Accession}.hmm.gz" with gzip.open(hmm_path, "wb") as f: row.HMM.hmm.write(f) self._df[["ID", "Accession", "Description", "Category"]].to_csv( base / "dat.csv", index=False ) return base
[docs] def clean(self, raw: bool = True, parsed: bool = False) -> None: """ Remove Pfam data. If `raw` and `parsed` are both ``False``, removes the :attr:`path` with all stored data. :param raw: Remove raw fetched files. :param parsed: Remove parsed files. :return: Nothing. """ if raw: rmtree(self.path / "raw") elif parsed: rmtree(self.path / "parsed") else: rmtree(self.path)
def _parse_dat(self, path: Path) -> pd.DataFrame: def wrap_chunk(xs: list[str]): _id, _acc, _desc, _, _type = map(lambda x: x.split(" ")[-1], xs[:5]) _acc = _acc.split(".")[0] return _id, _acc, _desc, _type with gzip.open(path, "rt") as f: lines = filter(bool, map(lambda x: x.rstrip(), f)) chunks = filter(bool, split_at(lines, lambda x: x.startswith("# "))) return pd.DataFrame( map(wrap_chunk, chunks), columns=list(self.dat_columns), ) @staticmethod def _parse_hmm(path: Path) -> pd.DataFrame: df = pd.DataFrame({"HMM": list(map(PyHMMer, iter_hmm(path)))}) df["Accession"] = df["HMM"].map( lambda x: x.hmm.accession.decode("utf-8").split(".")[0] ) return df def _validate_frame(self, df: pd.DataFrame): for c in self.dat_columns: if c not in df.columns: raise MissingData(f"Missing required column {c}")
if __name__ == "__main__": raise RuntimeError