"""
A module to handle the ancestral tree of the Chain*-type objects defined
by their ``parent``/``children`` attributes and/or ``meta`` info.
"""
import logging
import re
import typing as t
from collections import abc
from itertools import product
import networkx as nx
import rustworkx as rx
from more_itertools import windowed
from toolz import groupby
from lXtractor.chain import Chain, ChainSequence, ChainStructure, ChainList
from lXtractor.core.exceptions import MissingData, FormatError
from lXtractor.util.misc import all_logging_disabled
T = t.TypeVar("T")
CT = t.TypeVar("CT", ChainSequence, ChainStructure, Chain)
CT_: t.TypeAlias = Chain | ChainSequence | ChainStructure
_SEP = "<-("
FILLER = "*"
NODE_PATTERN = re.compile(r"(.+)\|(\d+-\d+)")
LOGGER = logging.getLogger(__name__)
__all__ = (
"list_ancestors_names",
"list_ancestors",
"make_filled",
"make_id_tree",
"recover",
)
def node_name(c: CT_) -> str:
"""
:param c: Chain*-type object.
:return:
"""
return f"{c.name}|{c.start}-{c.end}"
def _check_tree(g: nx.Graph):
if not nx.is_tree(g):
raise ValueError("Obtained graph is not a tree")
[docs]
def list_ancestors_names(id_or_chain: CT_ | str) -> list[str]:
"""
>>> list_ancestors_names('C|1-5<-(C|1-3<-(C|1-2))')
['C|1-3', 'C|1-2']
:param id_or_chain: Chain*-type object or its id.
:return: A list of parents '{name}|{start}-{end}' representations parsed
from the object's id.
"""
if not isinstance(id_or_chain, str):
try:
_id = id_or_chain.meta["id"]
except KeyError as e:
raise MissingData(f"Missing ID property in meta of {id_or_chain}") from e
else:
_id = id_or_chain
obj_ids = []
while _SEP in _id:
_id_sep = _id.split(_SEP)
_id = _SEP.join(_id_sep[1:]).removesuffix(")")
obj_ids.append(_id_sep[0])
obj_ids.append(_id)
return obj_ids[1:]
[docs]
def list_ancestors(c: CT_) -> list[CT_]:
"""
>>> o = ChainSequence.from_string('x' * 5, 1, 5, 'C')
>>> c13 = o.spawn_child(1, 3)
>>> c12 = c13.spawn_child(1, 2)
>>> list_ancestors(c12)
[C|1-3<-(C|1-5), C|1-5]
:param c: Chain*-type object.
:return: A list ancestor objects obtained from the ``parent`` attribute..
"""
if c.parent is None:
return []
parents = []
while c.parent is not None:
parents.append(c.parent)
c = c.parent
return parents
[docs]
def make_filled(name: str, _t: CT | t.Type[CT]) -> CT:
"""
Make a "filled" version of an object to occupy the tree.
:param name: Name of the node obtained via :func:`node_name`.
:param _t: Some Chain*-type object.
:return: An object with filled sequence. If it's a
:class:`ChainStructure <lXtractor.core.chain.structure.ChainStructure>`
object, it will have an empty structure.
"""
re_find = NODE_PATTERN.findall(name)
if len(re_find) != 1:
raise FormatError(
f"Failed to parse name {name} into {{name}}|{{start}}-{{end}} format. "
f"re.findall results: {re_find}"
)
match = re_find[0]
if len(match) != 2:
raise FormatError(
f"Unexpected match from {name}. Expected to find exactly two items: "
f"name and boundaries. Found {len(match)}: {match}"
)
real_name, bounds = match
start, end = map(int, bounds.split("-"))
if start == end == 0:
if issubclass(_t, (Chain, ChainStructure, ChainSequence)):
return _t.make_empty()
return _t.__class__.make_empty()
size = end - start + 1
seq = ChainSequence.from_string(size * FILLER, start, end, real_name)
is_chain_instance = (
isinstance(_t, Chain),
isinstance(_t, ChainSequence),
isinstance(_t, ChainStructure),
)
if any(is_chain_instance):
if is_chain_instance[0]:
return Chain.from_seq(seq)
if is_chain_instance[1]:
return seq
if is_chain_instance[2]:
return ChainStructure(None, seq=seq)
raise RuntimeError("...")
try:
is_chain_subclass = (
issubclass(_t, Chain),
issubclass(_t, ChainSequence),
issubclass(_t, ChainStructure),
)
except TypeError as e:
raise TypeError(f"Failed to infer type of {_t}") from e
if is_chain_subclass[0]:
return Chain.from_seq(seq)
if is_chain_subclass[1]:
return seq
if is_chain_subclass[2]:
return ChainStructure(None, seq=seq)
raise RuntimeError("...")
# def make_obj_tree(
# chains: abc.Iterable[CT], connect: bool = False, check_is_tree: bool = True
# ) -> nx.DiGraph:
# """
# Make an ancestral tree -- a directed graph representing ancestral
# relationships between chains. The nodes of the tree are Chain*-type
# objects. Hence, they must be hashable. This restricts types of sequences
# valid for :class:`ChainSequence <lXtractor.core.chain.sequence.
# ChainSequence>` to ``abc.Sequence[abc.Hashable]``.
#
# As a useful side effect, this function can aid in filling the gaps in the
# actual tree indicated by the id-relationship suggested by the "id" field
# of the ``meta`` property. In other words, if a segment S|1-2 was obtained
# by spawning from S|1-5, S|1-2's id will reflect this:
#
# >>> s = make_filled('S|1-5', ChainSequence.make_empty())
# >>> c12 = s.spawn_child(1, 2)
# >>> c12
# S|1-2<-(S|1-5)
#
# However, if S|1-5 was lost (e.g., by writing/reading S|1-2 to/from disk),
# and S|1-2.parent is None, we can use ID stored in meta to recover ancestral
# relationships. This function will attend to such cases and create a filler
# object S|1-5 with a "*"-filled sequence.
#
# >>> c12.parent = None
# >>> c12
# S|1-2
# >>> c12.meta['id']
# 'S|1-2<-(S|1-5)'
# >>> ct = make_obj_tree([c12],connect=True)
# >>> assert len(ct.nodes) == 2
# >>> [n.id for n in ct.nodes]
# ['S|1-2<-(S|1-5)', 'S|1-5']
#
# :param chains: A homogeneous iterable of Chain*-type objects.
# :param connect: If ``True``, connect both supplied and created filler
# objects via ``children`` and ``parent`` attributes.
# :param check_is_tree: If ``True``, check if the obtained graph is actually
# a tree. If it's not, raise ``ValueError``.
# :return: A networkx's directed graph with Chain*-type objects as nodes.
# """
# if not isinstance(chains, ChainList):
# chains = ChainList(chains)
# tree = nx.DiGraph()
# if len(chains) == 0:
# return tree
#
# # Populate objects' tree
# for c in chains:
# if not tree.has_node(c):
# tree.add_node(c)
# parents = list_ancestors(c)
# if parents:
# # fails to recognize parents as iterable
# for child, parent in windowed([c, *parents], 2): # type: ignore
# tree.add_edge(parent, child)
#
# # Make tree fully connected and populate `parent`, `children` attributes
# name2node = {node_name(n): n for n in tree.nodes}
# node_example = chains[0]
# node: CT
# for node in list(tree.nodes):
# names = [node_name(node), *list_ancestors_names(node)]
# for child_name, parent_name in windowed(names, 2):
# assert child_name is not None
# if parent_name is None:
# continue
# parent_obj = name2node.get(
# parent_name, make_filled(parent_name, node_example)
# )
# name2node[parent_name] = parent_obj
# child_obj = name2node[child_name]
# tree.add_edge(parent_obj, child_obj)
# if connect:
# if child_obj not in parent_obj.children:
# parent_obj.children.append(child_obj)
# child_obj.parent = parent_obj
#
# if check_is_tree:
# _check_tree(tree)
#
# return tree
def _connect(_child_name: str, _parent_name: str, _tree: nx.DiGraph):
child_objs = _tree.nodes[_child_name]["objs"]
parent_objs = _tree.nodes[_parent_name]["objs"]
for _child_obj, _parent_obj in product(child_objs, parent_objs):
if (
_child_obj.parent is None
and _parent_name in _child_obj.meta["id"]
or _parent_name in _child_obj.id
):
_child_obj.parent = _parent_obj
child_ids = [c.id for c in _parent_obj.children]
if _child_obj.id not in child_ids:
_parent_obj.children.append(_child_obj)
[docs]
def make_id_tree(
chains: abc.Iterable[CT_], connect: bool = False, check_is_tree: bool = True
) -> nx.DiGraph:
"""
A computationally cheaper alternative to :func:`make_obj_tree`, where
nodes are string objects, while actual objects reside in a node attribute
"objs". It allows for a faster tree construction since it avoids expensive
hashing of Chain*-type objects.
:param chains: An iterable of Chain*-type objects.
:param connect: If ``True``, connect both supplied and created filler
objects via ``children`` and ``parent`` attributes.
:param check_is_tree: If ``True``, check if the obtained graph is actually
a tree. If it's not, raise ``ValueError``.
:return: A networkx's directed graph.
"""
if not isinstance(chains, ChainList):
chains = ChainList(chains)
tree = nx.DiGraph()
if len(chains) == 0:
return tree
node_example = chains[0]
name2chains: dict[str, list[CT]] = groupby(node_name, chains)
for name in name2chains:
tree.add_node(name, objs=[])
for name, chains_group in name2chains.items():
tree.add_node(name, objs=chains_group)
for node in list(tree.nodes):
for obj in tree.nodes[node]["objs"]:
names = [node_name(obj), *list_ancestors_names(obj)]
for child_name, parent_name in windowed(names, 2):
assert child_name is not None
if parent_name is None:
continue
if parent_name not in tree.nodes:
with all_logging_disabled():
parent_obj = make_filled(parent_name, node_example)
tree.add_node(parent_name, objs=[parent_obj])
tree.add_edge(parent_name, child_name)
if connect:
_connect(child_name, parent_name, tree)
if check_is_tree:
_check_tree(tree)
return tree
# def make(
# chains: abc.Iterable[CT_],
# connect: bool = False,
# objects: bool = False,
# check_is_tree: bool = True,
# ) -> nx.DiGraph:
# """
# Make an ancestral tree -- a directed graph representing ancestral
# relationships between chains.
#
# :param chains: An iterable of Chain*-type objects.
# :param connect: Connect actual objects by populating ``.children`` and
# ``.parent`` attributes.
# :param objects: Create an object tree using :func:`make_obj_tree`.
# Otherwise, create a "string" tree using :func:`make_str_tree`.
# Check the docs of these functions to understand the differences.
# :param check_is_tree: If ``True``, check if the obtained graph is actually
# a tree. If it's not, raise ``ValueError``.
# :return:
# """
# if objects:
# return make_obj_tree(chains, connect, check_is_tree)
# return make_str_tree(chains, connect, check_is_tree)
[docs]
def recover(c: CT_) -> CT_:
"""
Recover ancestral relationships of a Chain*-type object.
This will use :func:`make_str_tree` to recover ancestors from object IDs of
an object itself and any encompassed children.
..note ::
It may be used as a callback in :meth:`lXtractor.chain.io.ChainIO.read`
..note ::
:func:`make_str_tree` creates "filled" parents via :func:`make_filled`
:param c: A Chain*-type object.
:return: The same object with populated ``children`` and ``parent``
attributes.
"""
all_chains = ChainList([c, *c.children.collapse()])
make_id_tree(all_chains.iter_sequences(), connect=True, check_is_tree=False)
make_id_tree(all_chains.iter_structures(), connect=True, check_is_tree=False)
make_id_tree(
all_chains.iter_structure_sequences(), connect=True, check_is_tree=False
)
if isinstance(c, Chain):
make_id_tree(all_chains, connect=True, check_is_tree=False)
return c
if __name__ == "__main__":
raise RuntimeError