"""
Module defines a segment object serving as base class for sequences
in lXtractor.
"""
from __future__ import annotations
import logging
import operator as op
import typing as t
import warnings
from collections import namedtuple, abc
from copy import deepcopy, copy
from itertools import combinations, filterfalse, chain
import networkx as nx
from more_itertools import always_reversible, powerset, take, nth, unique_everseen
from tqdm.auto import tqdm
from lXtractor.core.base import Ord, NamedTupleT
from lXtractor.core.config import DefaultConfig
from lXtractor.core.exceptions import (
LengthMismatch,
NoOverlap,
OverlapError,
FormatError,
)
from lXtractor.util.misc import is_valid_field_name
if t.TYPE_CHECKING:
from lXtractor.variables.base import Variables
_S = t.TypeVar("_S", bound="Segment", contravariant=True)
T = t.TypeVar("T")
_Joiner: t.TypeAlias = abc.Callable[[abc.Sequence[T], abc.Sequence[T]], abc.Sequence[T]]
_Filler: t.TypeAlias = abc.Callable[[int], abc.Sequence[t.Any]]
# _IterType = t.Union[abc.Iterator[tuple], abc.Iterator[namedtuple]]
DATA_HANDLE_MODES = ("merge", "self", "other")
DISALLOWED_NAME_SYMBOLS = ("|", "/", "\\")
LOGGER = logging.getLogger(__name__)
class _Item(t.Protocol):
def __call__(self, *args, **kwargs) -> NamedTupleT:
...
def _check_boundary_change(x_orig: int, x_new: int):
if x_orig != 0 and x_new == 0:
raise IndexError(f"Can't change none-zero coordinate {x_orig} to zero")
if x_orig == 0:
raise IndexError("Can't change boundaries of an empty segment")
if x_new < 0:
raise IndexError(f"Attempting to set a negative boundary {x_new}")
[docs]
class Segment(abc.Sequence[NamedTupleT]):
"""
An arbitrary segment with inclusive boundaries containing arbitrary number
of sequences.
Sequences themselves may be retrieved via ``[]`` syntax:
>>> s = Segment(1, 10, 'S', seqs={'X': list(range(10))})
>>> s.id == 'S|1-10'
True
>>> s['X'] == list(range(10))
True
>>> 'X' in s
True
One can use the same syntax to check if a Segment contains certain index:
>>> 1 in s and 10 in s and not 11 in s
True
Iteration over the segment yields it's items:
>>> next(iter(s))
Item(i=1, X=0)
One can just get the same item by explicit index:
>>> s[1]
Item(i=1, X=0)
Slicing returns an iterable slice object:
>>> list(s[1:2])
[Item(i=1, X=0), Item(i=2, X=1)]
One can add a new sequence in two ways.
1) using a method:
>>> s.add_seq('Y', tuple(range(10, 20)))
>>> 'Y' in s
True
2) using ``[]`` syntax:
>>> s['Y'] = tuple(range(10, 20))
>>> 'Y' in s
True
Note that using the first method, if ``s`` already contains ``Y``,
this will cause an exception. To overwrite a sequence with the same name,
please use explicit ``[]`` syntax.
Additionally, one can offset Segment indices using ``>>``/``<<`` syntax.
This operation mutates original Segment!
>>> s >> 1
S|2-11
>>> 11 in s
True
"""
__slots__ = (
"_start",
"_end",
"_name",
"_parent",
"_id",
"_seqs",
"children",
"meta",
"variables",
)
[docs]
def __init__(
self,
start: int,
end: int,
name: str = "S",
seqs: dict[str, abc.Sequence[t.Any]] | None = None,
parent: t.Self | None = None,
children: abc.MutableSequence[t.Self] | None = None,
meta: dict[str, t.Any] | None = None,
variables: Variables | None = None,
):
"""
:param start: Start coordinate.
:param end: End coordinate.
:param name: The name of the segment. Name with start and end
coordinates should uniquely specify the segment. They are used to
dynamically construct :meth:`id`.
:param seqs: A dictionary name => sequence, where sequence is some
sequence (preferably mutable) bounded by segment. Name of a
sequence must be "simple", i.e., convertable to a field of a
namedtuple.
:param parent: Parental segment bounding this instance, typically
obtained via :meth:`sub` or :meth:`sub_by` methods.
:param children: A mapping name => :class:`Segment` with child segments
bounded by this instance.
:param meta: A dictionary with any meta-information str() => str()
since reading/writing `meta` to disc will inevitably convert values
to strings.
:param variables: A collection of variables calculated or staged for
calculation for this segment.
"""
if (start <= 0 or end <= 0) and start != end:
raise ValueError("Boundaries must start from 1")
self._start = start
self._end = end
self._name = self._parse_name(name)
self._parent = parent
self.children = children or []
self.meta: dict[str, t.Any] = meta or {}
self._seqs: dict[str, abc.Sequence[t.Any]] = seqs or {}
self._id = self._make_id()
from lXtractor.variables.base import Variables
self.variables: Variables = variables or Variables()
self._setup_and_validate()
@staticmethod
def _parse_name(s: t.Any):
s = str(s)
for x in DISALLOWED_NAME_SYMBOLS:
if x in s:
raise NameError(f"Name {s} contains disallowed symbol {x}")
return s
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str):
if not isinstance(value, str):
raise TypeError("Name must have the type `str`")
self._name = value
self._id = self._make_id()
@property
def start(self) -> int:
"""
:return: A Segment's start coordinate.
"""
return self._start
@start.setter
def start(self, value: int):
_check_boundary_change(self.start, value)
if value > self.end:
raise IndexError(
f"Cannot start {value} further than the current end {self.end}"
)
if value > self.start:
idx = _translate_idx(value, self.start)
self._seqs = {k: s[idx:] for k, s in self._seqs.items()}
self._start = value
self._validate_seqs()
else:
if value < self.start:
if not self._seqs:
self._start = value
else:
raise IndexError(
f"Cannot set start {self.start} to {value} with "
"existing sequences"
)
self._id = self._make_id()
@property
def end(self) -> int:
"""
:return: A Segment's end coordinate.
"""
return self._end
@end.setter
def end(self, value: int):
_check_boundary_change(self.end, value)
if value < self.start:
raise IndexError(
f"Cannot set end {value} lower than the current start {self.start}"
)
if value < self.end:
idx = _translate_idx(value, self.start) + 1
self._seqs = {k: s[:idx] for k, s in self._seqs.items()}
self._end = value
self._validate_seqs()
else:
if value > self.end:
if not self._seqs:
self._end = value
else:
raise IndexError(
f"Cannot set end {self.end} to {value} with existing sequences"
)
self._id = self._make_id()
@property
def parent(self) -> t.Self | None:
return self._parent
@parent.setter
def parent(self, parent: t.Self | None):
if not isinstance(parent, (type(self), type(None))):
raise TypeError(
f"Parent must be of the same type {type(self)}. "
f"Got {type(parent)} instead"
)
self._parent = parent
self._id = self._make_id()
def _make_id(self):
parent = f"<-({self.parent.id})" if self.parent else ""
sep = DefaultConfig["separators"]
return f"{self.name}{sep['start_end']}{self.start}-{self.end}{parent}"
@property
def id(self) -> str:
"""
:return: Unique segment's identifier encapsulating name, boundaries and
parents of a segment if it was spawned from another
:class:`Segment` instance. For example::
S|1-2<-(P|1-10)
would specify a segment `S` with boundaries ``[1, 2]``
descended from `P`.
"""
return self._id
[docs]
def id_strip_parents(self):
"""
:return: An identifier of this segment without parent information.
"""
return self.id.split("<-")[0]
@property
def item_type(self) -> _Item:
"""
A factory to make an `Item` namedtuple object encapsulating sequence
names contained within this instance. The first field is reserved
for "i" -- an index. :return: `Item` namedtuple object.
"""
# Returns Type[Tuple[Any, ...]]
return namedtuple("Item", ["i", *self._seqs.keys()]) # type: ignore
@property
def is_empty(self) -> bool:
"""
:return: ``True`` if the segment is empty. Emptiness is a special case,
in which :class:`Segment` ``has start == end == 0``.
"""
return self.start == self.end == 0
@property
def is_singleton(self) -> bool:
"""
:return: ``True`` if the segment contains a single element. In this
special case, ``start == end``.
"""
return self.start == self.end
@property
def seq_names(self) -> list[str]:
"""
:return: A list of sequence names this segment entails.
"""
return list(self._seqs)
def __str__(self) -> str:
return self.id
def __repr__(self) -> str:
return self.id
def __iter__(self) -> abc.Iterator[NamedTupleT]:
if self.is_empty:
return iter([])
item_type = self.item_type
enum = range(self.start, self.end + 1)
if self._seqs:
return (
item_type(i, *x)
for i, x in zip(enum, zip(*self._seqs.values()), strict=True)
)
return (item_type(i) for i in enum)
@t.overload
def __getitem__(self, idx: int) -> NamedTupleT:
...
@t.overload
def __getitem__(self, idx: slice) -> t.Self:
...
@t.overload
def __getitem__(self, idx: str) -> abc.Sequence[t.Any]:
...
def __getitem__(
self, idx: slice | int | str
) -> NamedTupleT | t.Self | abc.Sequence[t.Any]:
idx_py: int | slice
if isinstance(idx, str):
if idx == "i":
return list(range(self.start, self.end + 1))
return self._seqs[idx]
if isinstance(idx, (int, slice)):
if self.is_empty:
raise IndexError("No slicing/indexing for an empty segment")
if isinstance(idx, int):
if idx == 0:
raise IndexError(
"Segment uses 1-based indexing, 0 is reserved for "
"an empty segment"
)
idx_py = _translate_idx(idx, self.start)
it = nth(iter(self), idx_py, None)
if it is None:
raise IndexError(
f"Index {idx}->{idx_py} lies outside of [0, {len(self)}]"
)
return it
# try:
# seqs = {k: v[idx_py : idx_py + 1] for k, v in self._seqs.items()}
# except IndexError as e:
# raise IndexError(f'{idx_py} is not in segment') from e
# return self.__class__(idx, idx, self.name, seqs)
if isinstance(idx, slice):
if idx.start == 0 or idx.stop == 0:
raise IndexError(
"Segment uses 1-based indexing, 0 is reserved for "
"an empty segment"
)
if idx.step is not None:
raise IndexError(
'Cannot create non-consecutive copy of segment => "step" '
"is slicing-incompatible"
)
if idx.start is not None and idx.start < self.start:
idx = slice(None, idx.stop, idx.step)
if idx.stop is not None and idx.stop > self.end:
idx = slice(idx.start, None, idx.step)
if idx.start is not None and idx.stop is not None and idx.start > idx.stop:
raise IndexError(
f"Start index {idx.start} is higher than end index {idx.stop}"
)
if (idx.start, idx.stop) in [
(None, None),
(None, self.end),
(self.start, None),
(self.start, self.end),
]:
return deepcopy(self)
idx_py = _translate_idx(idx, self.start)
try:
seqs = {k: v[idx_py.start : idx_py.stop] for k, v in self._seqs.items()}
except IndexError as e:
raise IndexError(f"Failed to index sequences using {idx}") from e
start = idx.start or self.start
end = idx.stop or self.end
return self.__class__(start, end, self.name, seqs, parent=self)
else:
raise TypeError(f"Cannot index with type {type(idx)}")
def __setitem__(self, key: str, value: abc.Sequence[t.Any]) -> None:
self._validate_seq(key, value)
self._seqs[key] = value
def __reversed__(self) -> abc.Iterator[NamedTupleT]:
return always_reversible(iter(self))
def __contains__(self, item: object) -> bool:
match item:
case str():
return item in self._seqs
case Ord():
return self.start <= item <= self.end
case _:
return False
def __len__(self) -> int:
if self.is_empty:
return 0
return self.end - self.start + 1
def __and__(self, other: t.Self) -> t.Self:
return self.overlap_with(other, True, "self")
def __or__(self, other: t.Self) -> t.Self:
def fill_str(s: int) -> str:
return "*" * s
def fill_seq(s: int) -> list[None]:
return [None] * s
def make_fillers(
d: abc.Mapping[str, abc.Sequence[t.Any]]
) -> dict[str, _Filler]:
return dict(
(k, fill_str if isinstance(v, str) else fill_seq) for k, v in d.items()
)
fillers = {**make_fillers(self._seqs), **make_fillers(other._seqs)}
return self.append(other, filler=fillers)
def __eq__(self, other: t.Any) -> bool:
if not isinstance(other, self.__class__):
return False
return (
self.start == other.start
and self.end == other.end
and self.meta == other.meta
and all(it_self == it_other for it_self, it_other in zip(self, other))
)
def __hash__(self):
return hash(self.id)
# seqs = tuple(tuple(x) for x in self)
# return hash((self.start, self.end, self.name, seqs, tuple(self.meta.items())))
def __rshift__(self, idx: int) -> Segment:
if self.is_empty:
raise ValueError("Cannot shift an empty segment")
return Segment(self.start + idx, self.end + idx, self.name, seqs=self._seqs)
def __lshift__(self, idx: int) -> Segment:
if self.is_empty:
raise ValueError("Cannot shift an empty segment")
return Segment(self.start - idx, self.end - idx, self.name, seqs=self._seqs)
def _validate_seq(self, name: str, seq: abc.Sequence):
if not is_valid_field_name(name):
raise FormatError(
f"Invalid field name {name}. "
f"Please use a valid variable name starting with a letter"
)
if len(seq) != len(self):
raise LengthMismatch(
f"Len({name})={len(seq)} doesn't match the segment's length {len(self)}"
)
def _validate_seqs(self):
for k, seq in self._seqs.items():
if len(seq) != len(self):
self._validate_seq(k, seq)
def _setup_and_validate(self):
if self.start > self.end or self.start < 0 or self.end < 0:
raise FormatError(f"Invalid boundaries {self.start}, {self.end}")
self._validate_seqs()
[docs]
def add_seq(self, name: str, seq: abc.Sequence[t.Any]) -> None:
"""
Add sequence to this segment.
:param name: Sequence's name. Should be convertible to the
namedtuple's field.
:param seq: A sequence with arbitrary elements and the length of
a segment.
:return: returns nothing. This operation mutates `attr:`seqs`.
:raise ValueError: If the `name` is reserved by another segment.
"""
if not is_valid_field_name(name):
raise ValueError(
f"Invalid field name {name}. "
f"Please use a valid variable name starting with a letter"
)
if name not in self:
self[name] = seq
else:
raise ValueError(
f"Segment already contains {name}. "
f"To overwrite existing sequences, use [] syntax"
)
[docs]
def append(
self,
other: t.Self,
filler: _Filler | abc.Mapping[str, _Filler] = (lambda x: [None] * x),
joiner: _Joiner | abc.Mapping[str, _Joiner] = op.add,
) -> t.Self:
"""
Append another segment to this one.
The encompassed sequences will be merged together by `joiner`. If a
sequence is missing in this segment or `other`, `filler` will create
a sequence with filled values. The sequences will be deep-copied before
merge.
>>> a = Segment(1, 3, "A", seqs={"A": "AAA"})
>>> b = Segment(1, 2, "B", seqs={"B": "BB"})
>>> c = a.append(b, filler=lambda x: '*' * x)
>>> c.id
'A|1-5'
>>> c['A']
'AAA**'
>>> c['B']
'***BB'
Note that the same can be achieved via ``|`` operator:
>>> a | b == a.append(b, filler=lambda x: '*' * x)
True
This will use ``"*"`` filler for ``str``-type sequences and ``None``
for the rest and use the default `joiner` for joining them.
.. note::
Appending to an empty segment will return `other`.
Appending an empty segment will return this segment.
.. warning::
Appending creates a new segment and removes associated parent and
metadata
:param other: Another arbitrary segment.
:param filler: A callable accepting the positive integer and returning
a filled in a sequence or a ``dict`` mapping sequence names to such
callables.
:param joiner: A callable accepting two sequences and returning a merged
sequence or a ``dict`` mapping sequence names to such callables.
:return: A new segment with the same name as this segment, extended
by `other`.
"""
def fill_by_empty(
seqs1: dict[str, abc.Sequence[t.Any]],
seqs2: dict[str, abc.Sequence[t.Any]],
size: int,
):
for k, v in seqs2.items():
if k not in seqs1:
if isinstance(filler, abc.Mapping):
f_fn = filler[k]
else:
f_fn = filler
seqs1[k] = f_fn(size)
if self.is_empty:
return other
if other.is_empty:
return self
seqs_self = deepcopy(self._seqs)
seqs_other = deepcopy(other._seqs)
fill_by_empty(seqs_self, seqs_other, len(self))
fill_by_empty(seqs_other, seqs_self, len(other))
seqs = {}
for _k in unique_everseen(chain(seqs_self, seqs_other)):
if isinstance(joiner, abc.Mapping):
j_fn = joiner[_k]
else:
j_fn = joiner
try:
seqs[_k] = j_fn(seqs_self[_k], seqs_other[_k])
except Exception as e:
raise RuntimeError(f"Failed to join {_k}") from e
return self.__class__(
self.start, self.start + len(self) + len(other) - 1, self.name, seqs
)
[docs]
def insert(self, other: t.Self, i: int, **kwargs) -> t.Self:
"""
Insert a segment into this one.
The function splits this segment into two parts at the provided index
and insert `other` between them via :meth:`append`. The latter handles
common/unique sequences via `filler` and `joiner` arguments, which can
be passed here as keyword arguments.
.. note::
Inserting an empty segment returns this instance.
Inserting a segment at the :meth:`end` appends `other`.
.. warning::
Inserting creates a new segment and removes associated parent and
metadata
:param other: Another segment to insert.
:param i: Index to insert at. The insertion will be performed after `i`.
:param kwargs: Passed to :meth:`append`.
:return: A new segment with inserted `other`.
:raises IndexError: If attempting to insert at an invalid index. Only
indices ``start < i <= end`` are valid.
"""
if other.is_empty:
return self
if self.is_empty:
raise IndexError("Cannot insert into empty segment")
if i == self.end:
return self.append(other, **kwargs)
if not (self.start <= i < self.end):
raise IndexError("Provided index is outside of this segment's boundaries")
s1, s2 = self[self.start : i], self[i + 1 : self.end]
return s1.append(other, **kwargs).append(s2, **kwargs)
[docs]
def remove_seq(self, name: str) -> None:
"""
Remove sequence from this segment.
:param name: Sequence's name. If doesn't exist in this segment, nothing
happens.
"""
try:
self._seqs.pop(name)
except KeyError:
pass
[docs]
def bounds(self, other: Segment) -> bool:
"""
Check if this segment bounds other.
::
self: +-------+
other: +----+
=> True
:param other; Another segment.
"""
return other.is_empty or (other.start >= self.start and self.end >= other.end)
[docs]
def bounded_by(self, other: Segment) -> bool:
"""
Check whether this segment is bounded by other.
::
self: +----+
other: +------+
=> True
:param other; Another segment.
"""
return self.is_empty or (self.start >= other.start and other.end >= self.end)
[docs]
def overlaps(self, other: Segment) -> bool:
"""
Check whether a segment overlaps with the other segment.
Use :meth:`overlap_with` to produce an overlapping child
:class:`Segment`.
:param other: other :class:`Segment` instance.
:return: ``True`` if segments overlap and ``False`` otherwise.
"""
return (self.is_empty or other.is_empty) or not (
other.start > self.end or self.start > other.end
)
[docs]
def overlap_with(
self,
other: Segment,
deep_copy: bool = True,
handle_mode: str = "merge",
sep: str = "&",
) -> t.Self:
"""
Overlap this segment with other over common indices.
::
self: +---------+
other: +-------+
=>: +-----+
:param other: other :class:`Segment` instance.
:param deep_copy: deepcopy seqs to avoid side effects.
:param handle_mode: When the child overlapping segment is created,
this parameter defines how :attr:`name` and :attr:`meta`
are handled. The following values are possible:
- "merge": merge meta and name from `self` and `other`
- "self": the current instance provides both attributes
- "other": `other` provides both attributes
:param sep: If `handle_mode` == "merge", the new name is created
by joining names of `self` and `other` using this separator.
:return: New segment instance with inherited name and meta.
"""
def subset_seqs(
_seqs: dict[str, abc.Sequence], curr_start: int, ov_start: int, ov_end: int
) -> dict[str, abc.Sequence]:
_start, _end = ov_start - curr_start, ov_end - curr_start
return {k: s[_start : _end + 1] for k, s in _seqs.items()}
if not self.overlaps(other):
raise NoOverlap(f"Segments {self} and {other} do not overlap")
if self.is_empty:
warnings.warn("Overlapping empty & non-empty always results in empty")
return self
if other.is_empty:
warnings.warn("Overlapping non-empty & empty always results in empty")
return self.__class__(0, 0)
start, end = max(self.start, other.start), min(self.end, other.end)
if handle_mode == "merge":
meta = {**self.meta, **other.meta}
seqs = {
**subset_seqs(self._seqs, self.start, start, end),
**subset_seqs(other._seqs, other.start, start, end),
}
name: str | None = sep.join(map(str, [self.name, other.name]))
elif handle_mode == "self":
meta, name = self.meta, self.name
seqs = subset_seqs(self._seqs, self.start, start, end)
elif handle_mode == "other":
meta, name = other.meta, other.name
seqs = subset_seqs(other._seqs, other.start, start, end)
else:
raise ValueError(
f"Handle mode {handle_mode} is not supported. "
f"Supported modes are {DATA_HANDLE_MODES}"
)
meta = copy(meta)
if deep_copy:
seqs = deepcopy(seqs)
return self.__class__(start, end, name, seqs=seqs, parent=self, meta=meta)
[docs]
def overlap(self, start: int, end: int) -> t.Self:
"""
Create new segment from the current instance using overlapping boundaries.
:param start: Starting coordinate.
:param end: Ending coordinate.
:return: New overlapping segment with :attr:`data` and :attr:`name`
"""
other = self.__class__(start, end)
if not self.overlaps(other):
raise NoOverlap
return self.overlap_with(other, True, "self")
[docs]
def sub_by(self, other: Segment, **kwargs) -> t.Self:
"""
A specialized version of :meth:`overlap_with` used in cases
where `other` is assumed to be a part of the current segment
(hence, a subsegment).
:param other: Some other segment contained within the
(`start`, `end`) boundaries.
:param kwargs: Passed to :meth:`overlap_with`.
:return: A new :class:`Segment` object with boundaries of `other`.
See :meth:`overlap_with` on how to handle segments' names and data.
:raises NoOverlap: If `other`'s boundaries lie outside the existing
:attr:`start`, :attr:`end`.
"""
if not self.bounds(other):
raise NoOverlap(
f"Provided ({other.start, other.end}) boundaries are not "
f"within the existing boundaries ({self.start, self.end})"
)
return self.overlap_with(other, **kwargs)
[docs]
def sub(self, start: int, end: int, **kwargs) -> t.Self:
"""
Subset current segment using provided boundaries.
Will create a new segment and call :meth:`sub_by`.
:param start: new start.
:param end: new end.
:param kwargs: passed to :meth:`overlap_with`
"""
return self.sub_by(Segment(start, end), **kwargs)
def _translate_idx(idx: T, offset: int) -> T:
if isinstance(idx, slice):
start = idx.start
stop = idx.stop
if start is not None:
start = max(start - offset, 0)
if stop is not None:
stop = stop - offset + 1
# Mypy fails at type narrowing here.
# See https://github.com/python/mypy/issues/14045
return t.cast(T, slice(start, stop, idx.step))
if isinstance(idx, int):
return t.cast(T, idx - offset)
return idx
[docs]
def segments2graph(segments: abc.Iterable[Segment]) -> nx.Graph:
"""
Convert segments to an undirected graph such that segments are nodes
and edges are drawn between overlapping segments.
:param segments: an iterable with segments objects.
:return: an undirected graph.
"""
g = nx.Graph()
for s in segments:
g.add_node(s)
edges = [(s, n) for n in g.nodes if n != s and s.overlaps(n)]
if edges:
g.add_edges_from(edges)
return g
[docs]
def do_overlap(segments: abc.Iterable[Segment]) -> bool:
"""
Check if any pair of segments overlap.
:param segments: an iterable with at least two segments.
:return: ``True`` if there are overlapping segments, ``False`` otherwise.
"""
return any(s1.overlaps(s2) for s1, s2 in combinations(segments, 2))
[docs]
def resolve_overlaps(
segments: abc.Iterable[Segment],
value_fn: abc.Callable[[Segment], float] = len,
max_it: int | None = None,
verbose: bool = False,
) -> abc.Generator[Segment, None, None]:
"""
Eliminate overlapping segments.
Convert segments into and undirected graph (see :func:`segments2graph`).
Iterate over connected components.
If a component has only a single node (no overlapsĀ§), yield it.
Otherwise, consider all possible non-overlapping subsets of nodes.
Find a subset such that the sum of the `value_fn` over the segments is
maximized and yield nodes from it.
:param segments: A collection of possibly overlapping segments.
:param value_fn: A function accepting the segment and returning its value.
:param max_it: The maximum number of subsets to consider when resolving a
group of overlapping segments.
:param verbose: Progress bar and general info.
:return: A collection of non-overlapping segments with maximum cumulative
value. Note that the optimal solution is guaranteed iff the number of
possible subsets for an overlapping group does not exceed `max_it`.
"""
# TODO: option to fallback to a greedy strategy when reaching `max_it`
g = segments2graph(segments)
ccs = nx.connected_components(g)
if verbose:
ccs = list(ccs)
LOGGER.info(
f"Found {len(ccs)} connected components with sizes: "
f"{list(map(len, ccs))}"
)
for i, cc in enumerate(nx.connected_components(g), start=1):
if len(cc) == 1:
yield cc.pop()
else:
sets: abc.Iterable | list = powerset(cc)
if verbose:
sets = tqdm(sets, desc=f"Resolving cc {i} with size: {len(cc)}")
if max_it is not None and max_it > 0:
sets = take(max_it, sets)
overlapping_subsets = filterfalse(do_overlap, sets)
yield from max(overlapping_subsets, key=lambda xs: sum(map(value_fn, xs)))
[docs]
def map_segment_numbering(
segments_from: t.Sequence[Segment], segments_to: t.Sequence[Segment]
) -> abc.Iterator[tuple[int, int | None]]:
"""
Create a continuous mapping between the numberings of two segment
collections. They must contain the same number of equal length
non-overlapping segments. Segments in the `segments_from` collection are
considered to span a continuous sequence, possibly interrupted due to
discontinuities in a sequence represented by `segments_to`'s segments.
Hence, the segments in `segments_from` form continuous numbering over
which numberings of `segments_to` segments are joined.
:param segments_from: A sequence of segments to map from.
:param segments_to: A sequence of segments to map to.
:return: An iterable over (key, value) pairs. Keys correspond to numberings
of the `segments_from`, values -- to numberings of `segments_to`.
"""
if len(segments_to) != len(segments_from):
raise LengthMismatch("Segment collections must be of the same length")
for s1, s2 in zip(segments_from, segments_to):
if len(s1) != len(s2):
raise LengthMismatch(
f"Lengths of segments must match. "
f"Got len({s1})={len(s1)}, len({s2})={len(s2)}"
)
for s1, s2 in zip(segments_from, segments_from[1:]):
if s2.overlaps(s1):
raise OverlapError(f"Segments {s1},{s2} in `segments_from` overlap")
for s1, s2 in zip(segments_to, segments_to[1:]):
if s2.overlaps(s1):
raise OverlapError(f"Segments {s1},{s2} in `segments_to` overlap")
hole_sizes = chain(
((s2.start - s1.end) for s1, s2 in zip(segments_to, segments_to[1:])), (0,)
)
return zip(
range(segments_from[0].start, segments_from[-1].end + 1),
chain.from_iterable(
chain(
range(s.start, s.end + 1), (None for _ in range(s.end + 1, h + s.end))
)
for s, h in zip(segments_to, hole_sizes)
),
)
if __name__ == "__main__":
raise RuntimeError