"""
A module to handle the ancestral tree of the Chain*-type objects defined
by their ``parent``/``children`` attributes and/or ``meta`` info.
"""
import logging
import re
import typing as t
from collections import abc
from itertools import product
import networkx as nx
from more_itertools import windowed
from toolz import groupby
from lXtractor.chain import Chain, ChainSequence, ChainStructure, ChainList
from lXtractor.core.exceptions import MissingData, FormatError
from lXtractor.util.misc import all_logging_disabled
T = t.TypeVar("T")
CT = t.TypeVar("CT", ChainSequence, ChainStructure, Chain)
CT_: t.TypeAlias = Chain | ChainSequence | ChainStructure
_SEP = "<-("
FILLER = "*"
NODE_PATTERN = re.compile(r"(.+)\|(\d+-\d+)")
LOGGER = logging.getLogger(__name__)
__all__ = (
"list_ancestors_names",
"list_ancestors",
"make",
"make_filled",
"make_str_tree",
"make_obj_tree",
"recover",
)
def node_name(c: CT_) -> str:
"""
:param c: Chain*-type object.
:return:
"""
return f"{c.name}|{c.start}-{c.end}"
def _check_tree(g: nx.Graph):
if not nx.is_tree(g):
raise ValueError("Obtained graph is not a tree")
[docs]
def list_ancestors_names(id_or_chain: CT_ | str) -> list[str]:
"""
>>> list_ancestors_names('C|1-5<-(C|1-3<-(C|1-2))')
['C|1-3', 'C|1-2']
:param id_or_chain: Chain*-type object or its id.
:return: A list of parents '{name}|{start}-{end}' representations parsed
from the object's id.
"""
if not isinstance(id_or_chain, str):
try:
_id = id_or_chain.meta["id"]
except KeyError as e:
raise MissingData(f"Missing ID property in meta of {id_or_chain}") from e
else:
_id = id_or_chain
obj_ids = []
while _SEP in _id:
_id_sep = _id.split(_SEP)
_id = _SEP.join(_id_sep[1:]).removesuffix(")")
obj_ids.append(_id_sep[0])
obj_ids.append(_id)
return obj_ids[1:]
[docs]
def list_ancestors(c: CT_) -> list[CT_]:
"""
>>> o = ChainSequence.from_string('x' * 5, 1, 5, 'C')
>>> c13 = o.spawn_child(1, 3)
>>> c12 = c13.spawn_child(1, 2)
>>> list_ancestors(c12)
[C|1-3<-(C|1-5), C|1-5]
:param c: Chain*-type object.
:return: A list ancestor objects obtained from the ``parent`` attribute..
"""
if c.parent is None:
return []
parents = []
while c.parent is not None:
parents.append(c.parent)
c = c.parent
return parents
[docs]
def make_filled(name: str, _t: CT | t.Type[CT]) -> CT:
"""
Make a "filled" version of an object to occupy the tree.
:param name: Name of the node obtained via :func:`node_name`.
:param _t: Some Chain*-type object.
:return: An object with filled sequence. If it's a
:class:`ChainStructure <lXtractor.core.chain.structure.ChainStructure>`
object, it will have an empty structure.
"""
re_find = NODE_PATTERN.findall(name)
if len(re_find) != 1:
raise FormatError(
f"Failed to parse name {name} into {{name}}|{{start}}-{{end}} format. "
f"re.findall results: {re_find}"
)
match = re_find[0]
if len(match) != 2:
raise FormatError(
f"Unexpected match from {name}. Expected to find exactly two items: "
f"name and boundaries. Found {len(match)}: {match}"
)
real_name, bounds = match
start, end = map(int, bounds.split("-"))
if start == end == 0:
if issubclass(_t, (Chain, ChainStructure, ChainSequence)):
return _t.make_empty()
return _t.__class__.make_empty()
size = end - start + 1
seq = ChainSequence.from_string(size * FILLER, start, end, real_name)
is_chain_instance = (
isinstance(_t, Chain),
isinstance(_t, ChainSequence),
isinstance(_t, ChainStructure),
)
if any(is_chain_instance):
if is_chain_instance[0]:
return Chain.from_seq(seq)
if is_chain_instance[1]:
return seq
if is_chain_instance[2]:
return ChainStructure(None, seq=seq)
raise RuntimeError("...")
try:
is_chain_subclass = (
issubclass(_t, Chain),
issubclass(_t, ChainSequence),
issubclass(_t, ChainStructure),
)
except TypeError as e:
raise TypeError(f"Failed to infer type of {_t}") from e
if is_chain_subclass[0]:
return Chain.from_seq(seq)
if is_chain_subclass[1]:
return seq
if is_chain_subclass[2]:
return ChainStructure(None, seq=seq)
raise RuntimeError("...")
[docs]
def make_obj_tree(
chains: abc.Iterable[CT], connect: bool = False, check_is_tree: bool = True
) -> nx.DiGraph:
"""
Make an ancestral tree -- a directed graph representing ancestral
relationships between chains. The nodes of the tree are Chain*-type
objects. Hence, they must be hashable. This restricts types of sequences
valid for :class:`ChainSequence <lXtractor.core.chain.sequence.
ChainSequence>` to ``abc.Sequence[abc.Hashable]``.
As a useful side effect, this function can aid in filling the gaps in the
actual tree indicated by the id-relationship suggested by the "id" field
of the ``meta`` property. In other words, if a segment S|1-2 was obtained
by spawning from S|1-5, S|1-2's id will reflect this:
>>> s = make_filled('S|1-5', ChainSequence.make_empty())
>>> c12 = s.spawn_child(1, 2)
>>> c12
S|1-2<-(S|1-5)
However, if S|1-5 was lost (e.g., by writing/reading S|1-2 to/from disk),
and S|1-2.parent is None, we can use ID stored in meta to recover ancestral
relationships. This function will attend to such cases and create a filler
object S|1-5 with a "*"-filled sequence.
>>> c12.parent = None
>>> c12
S|1-2
>>> c12.meta['id']
'S|1-2<-(S|1-5)'
>>> ct = make_obj_tree([c12],connect=True)
>>> assert len(ct.nodes) == 2
>>> [n.id for n in ct.nodes]
['S|1-2<-(S|1-5)', 'S|1-5']
:param chains: A homogeneous iterable of Chain*-type objects.
:param connect: If ``True``, connect both supplied and created filler
objects via ``children`` and ``parent`` attributes.
:param check_is_tree: If ``True``, check if the obtained graph is actually
a tree. If it's not, raise ``ValueError``.
:return: A networkx's directed graph with Chain*-type objects as nodes.
"""
if not isinstance(chains, ChainList):
chains = ChainList(chains)
tree = nx.DiGraph()
if len(chains) == 0:
return tree
# Populate objects' tree
for c in chains:
if not tree.has_node(c):
tree.add_node(c)
parents = list_ancestors(c)
if parents:
# fails to recognize parents as iterable
for child, parent in windowed([c, *parents], 2): # type: ignore
tree.add_edge(parent, child)
# Make tree fully connected and populate `parent`, `children` attributes
name2node = {node_name(n): n for n in tree.nodes}
node_example = chains[0]
node: CT
for node in list(tree.nodes):
names = [node_name(node), *list_ancestors_names(node)]
for child_name, parent_name in windowed(names, 2):
assert child_name is not None
if parent_name is None:
continue
parent_obj = name2node.get(
parent_name, make_filled(parent_name, node_example)
)
name2node[parent_name] = parent_obj
child_obj = name2node[child_name]
tree.add_edge(parent_obj, child_obj)
if connect:
if child_obj not in parent_obj.children:
parent_obj.children.append(child_obj)
child_obj.parent = parent_obj
if check_is_tree:
_check_tree(tree)
return tree
def _connect(_child_name: str, _parent_name: str, _tree: nx.DiGraph):
child_objs = _tree.nodes[_child_name]["objs"]
parent_objs = _tree.nodes[_parent_name]["objs"]
for _child_obj, _parent_obj in product(child_objs, parent_objs):
if (
_child_obj.parent is None
and _parent_name in _child_obj.meta["id"]
or _parent_name in _child_obj.id
):
_child_obj.parent = _parent_obj
child_ids = [c.id for c in _parent_obj.children]
if _child_obj.id not in child_ids:
_parent_obj.children.append(_child_obj)
[docs]
def make_str_tree(
chains: abc.Iterable[CT_], connect: bool = False, check_is_tree: bool = True
) -> nx.DiGraph:
"""
A computationally cheaper alternative to :func:`make_obj_tree`, where
nodes are string objects, while actual objects reside in a node attribute
"objs". It allows for a faster tree construction since it avoids expensive
hashing of Chain*-type objects.
:param chains: An iterable of Chain*-type objects.
:param connect: If ``True``, connect both supplied and created filler
objects via ``children`` and ``parent`` attributes.
:param check_is_tree: If ``True``, check if the obtained graph is actually
a tree. If it's not, raise ``ValueError``.
:return: A networkx's directed graph.
"""
if not isinstance(chains, ChainList):
chains = ChainList(chains)
tree = nx.DiGraph()
if len(chains) == 0:
return tree
node_example = chains[0]
name2chains: dict[str, list[CT]] = groupby(node_name, chains)
for name in name2chains:
tree.add_node(name, objs=[])
for name, chains_group in name2chains.items():
tree.add_node(name, objs=chains_group)
for node in list(tree.nodes):
for obj in tree.nodes[node]["objs"]:
names = [node_name(obj), *list_ancestors_names(obj)]
for child_name, parent_name in windowed(names, 2):
assert child_name is not None
if parent_name is None:
continue
if parent_name not in tree.nodes:
with all_logging_disabled():
parent_obj = make_filled(parent_name, node_example)
tree.add_node(parent_name, objs=[parent_obj])
tree.add_edge(parent_name, child_name)
if connect:
_connect(child_name, parent_name, tree)
if check_is_tree:
_check_tree(tree)
return tree
[docs]
def make(
chains: abc.Iterable[CT_],
connect: bool = False,
objects: bool = False,
check_is_tree: bool = True,
) -> nx.DiGraph:
"""
Make an ancestral tree -- a directed graph representing ancestral
relationships between chains.
:param chains: An iterable of Chain*-type objects.
:param connect: Connect actual objects by populating ``.children`` and
``.parent`` attributes.
:param objects: Create an object tree using :func:`make_obj_tree`.
Otherwise, create a "string" tree using :func:`make_str_tree`.
Check the docs of these functions to understand the differences.
:param check_is_tree: If ``True``, check if the obtained graph is actually
a tree. If it's not, raise ``ValueError``.
:return:
"""
if objects:
return make_obj_tree(chains, connect, check_is_tree)
return make_str_tree(chains, connect, check_is_tree)
[docs]
def recover(c: CT_) -> CT_:
"""
Recover ancestral relationships of a Chain*-type object.
This will use :func:`make_str_tree` to recover ancestors from object IDs of
an object itself and any encompassed children.
..note ::
It may be used as a callback in :meth:`lXtractor.chain.io.ChainIO.read`
..note ::
:func:`make_str_tree` creates "filled" parents via :func:`make_filled`
:param c: A Chain*-type object.
:return: The same object with populated ``children`` and ``parent``
attributes.
"""
all_chains = ChainList([c, *c.children.collapse()])
make_str_tree(all_chains.iter_sequences(), connect=True, check_is_tree=False)
make_str_tree(all_chains.iter_structures(), connect=True, check_is_tree=False)
make_str_tree(
all_chains.iter_structure_sequences(), connect=True, check_is_tree=False
)
if isinstance(c, Chain):
make_str_tree(all_chains, connect=True, check_is_tree=False)
return c
if __name__ == "__main__":
raise RuntimeError