Source code for muxpack.multiplexseries

"""Multi-period multiplex graph representation.

This module defines :class:`MultiplexSeries`, which stores edges across periods
and exposes filtering, per-period access, collapsing, and persistence helpers.
"""

import ibis

from .check import check_edges, check_vertices
from pathlib import Path
from . import io
from .multiplex import Multiplex
import logging
from typing import Tuple

logger = logging.getLogger(__name__)

[docs] class MultiplexSeries: """ A multiplexseries is a series of Multiplex graphs with multiple layers, spanning multiple periods. """ #: The edges of the multiplex. This is a table with columns "src", "dst", "period", "layer" and "relationtype". edges: ibis.Table #: The vertices of the multiplex. This is a table with a column "id","period" and optional additional columns. vertices: ibis.Table | None # vertex_ids: ibis.Table relationtypes: ibis.Table | None
[docs] def __init__( self, edges: ibis.Table, vertices: ibis.Table = None, relationtypes: ibis.Table = None, ) -> None: """ Initialize a multiplex series with the given edges and vertices tables. Args: - edges: table with columns ``src``, ``dst``, ``period``, ``layer``, and ``relationtype``. - vertices: table with column ``id``, ``period``, and optional additional columns. Must have a ``period`` column because the edges table has one. - relationtypes: table with columns ``relationtype``, ``layer``, ``label``, and optional additional columns. Raises: - ValueError: if the edges table does not satisfy the required schema. - ValueError: if the vertices table does not satisfy the required schema. """ if not check_edges(edges): raise ValueError("Invalid edges table") if vertices is not None and not check_vertices(vertices): raise ValueError("Invalid vertices table") self.edges = edges # TODO derive vertices from edges if not provided self.vertices = vertices self.relationtypes = relationtypes if not vertices is None: logger.info("Vertices table provided, using it as is.") self.vertex_ids = vertices[["id"]].distinct()
[docs] def periods(self) -> list[int]: """ Get the list of periods present in the multiplex series. Returns: - Sorted list of period values. """ periods = ( self.edges.select("period") .distinct() .order_by("period") .period .to_list() ) # periods = self.edges[["period"]].distinct().to_pandas().period.to_list() return periods
[docs] def layers(self) -> list[str]: """ Get the list of layers present in the multiplex series. Returns: - Sorted list of layer names. """ layers = ( self.edges.select("layer") .distinct() .order_by("layer") .layer .to_list() ) return layers
[docs] def update_vertices(self) -> None: """ Update the vertices table by deriving it from the edges table. This is useful when the vertices table was not provided at initialization. Both ``self.vertices`` and ``self.vertex_ids`` are updated in place. """ src = self.edges.select(id="src", period="period").distinct() dst = self.edges.select(id="dst", period="period").distinct() V = src.union(dst, distinct=True) V_all = V.select(V.id) self.vertices = ibis.memtable(V.to_pyarrow()) self.vertex_ids = ibis.memtable(V_all.to_pyarrow())
[docs] def update_relationtypes(self) -> None: """ Update the relationtypes table by deriving it from the edges table. This is useful when the relationtypes table was not provided at initialization. A ``label`` column is constructed as ``"<layer>_<relationtype>"``. ``self.relationtypes`` is updated in place. """ relationtypes = ( self.edges.select(self.edges.relationtype, self.edges.layer) .distinct() .order_by("layer", "relationtype") .to_pandas() .assign( label=lambda df: ( df["layer"].astype(str) + "_" + df["relationtype"].astype(str) ) ) ) logger.debug( f"Updated relationtypes table with {len(relationtypes)} unique relationtypes." ) self.relationtypes = ibis.memtable(relationtypes)
[docs] def get_multiplex(self, period: int) -> Multiplex: """ Return the multiplex for a specific period. Args: - period: the period to retrieve. Returns: - Multiplex object containing only the edges and vertices for the given period. """ E_y = self.edges.filter(self.edges.period == period) if self.vertices is not None: V_y = self.vertices.filter(self.vertices.period == period) else: V_y = None return Multiplex(edges=E_y, vertices=V_y, period=period)
[docs] def multiplexes(self) -> list[Tuple[int, Multiplex]]: """ Return all multiplexes in the series, one per period. Returns: - List of ``(period, Multiplex)`` tuples, ordered by period. """ periods = self.periods() return [(period, self.get_multiplex(period)) for period in periods]
[docs] def add_filter( self, periods: list[int] = None, layers: dict[str, list[int] | None] = None, src: list[int] = None, dst: list[int] = None, ) -> None: """ Apply a filter to the multiplex series in place. Filtering is lazy: the filter is only executed when saving or converting to another format. Passing ``None`` or an empty list for any argument means no filtering is applied for that dimension. For advanced filtering, modify the ``edges`` property directly using ibis expressions. Args: - periods: list of periods to keep. - layers: dict of {layer:[relationtype]} to keep. - src: list of source vertex ids (ego) to keep. - dst: list of destination vertex ids (non-ego) to keep. """ E = self.edges flt: list[ibis.BooleanValue] = [] if periods is not None and len(periods) > 0: flt.append(E.period.isin(periods)) if layers is not None and len(layers) > 0: rt = [] for layer, relationtypes in layers.items(): e = E.layer == layer if not relationtypes is None: e = ibis.and_(e, E.relationtype.isin(relationtypes)) rt.append(e) if len(rt) > 1: flt.append(ibis.or_(rt)) elif len(rt) == 1: flt.append(e) if src is not None and len(src) > 0: vid = ibis.memtable({"id": src}) # we use semi join because we expect the vertex list to be large E = E.semi_join(vid, E.src == vid.id) if dst is not None and len(dst) > 0: vid = ibis.memtable({"id": dst}) # we use semi join because we expect the vertex list to be large E = E.semi_join(vid, E.dst == vid.id) logger.debug("Filter: f{flt}") if len(flt): E = E.filter(flt) self.edges = E
def __copy__(self) -> "MultiplexSeries": """ Return a shallow copy of this MultiplexSeries. Returns: - A new MultiplexSeries sharing the same ``edges`` and ``vertices`` tables. """ return MultiplexSeries(self.edges, self.vertices)
[docs] def collapse(self) -> Multiplex: """ Collapse the multiplex series into a single Multiplex by discarding period information. Duplicate edges across periods are removed. This is useful for analyses that do not require temporal information. Returns: - Multiplex containing all distinct edges across all periods, with ``period=None``. """ E = self.edges.select(["src", "dst", "layer", "relationtype"]).distinct() if self.vertices is not None: V = self.vertices.select("id").distinct() else: V = None return Multiplex(edges=E, vertices=V, period=None)
[docs] def collapse_to(self, dir: Path | str) -> None: """ Collapse the multiplex series and save the result to disk. This is a convenience method equivalent to calling ``collapse()`` followed by ``Multiplex.save()``. Args: - dir: path to the directory where the collapsed Multiplex will be saved. """ m = self.collapse() return m.save(dir=dir)
[docs] def save(self, dir: Path | str, **kw_args) -> None: """ Save the multiplex series to disk. The directory is created if it does not exist; existing files are overwritten. Saving also evaluates the lazy ``edges`` and ``vertices`` expressions and updates them to point at the saved files, which can improve subsequent performance. Args: - dir: path to the directory where the MultiplexSeries will be saved. - **kw_args: additional keyword arguments forwarded to ``io.save_multiplexseries``. """ edges = self.edges vertices = self.vertices if vertices is None: mp = MultiplexSeries(edges=self.edges) mp.update_vertices() vertices = mp.vertices E, V = io.save_multiplexseries(edges, vertices, dir=dir, **kw_args) self.edges = E self.vertices = V