Source code for muxpack.io

"""Input and output helpers for the muxpack on-disk layout.

This module provides low-level read/write functions used by high-level classes
such as :class:`muxpack.Multiplex` and :class:`muxpack.MultiplexSeries`.
"""

import ibis

from muxpack.bipartite import Bipartite
from .multiplexseries import MultiplexSeries
from pathlib import Path
import os
import logging
from typing import Tuple

logger = logging.getLogger(__name__)


[docs] def read_multiplexseries(dir: Path) -> MultiplexSeries: """ Load a multiplex series from a directory containing Parquet files. The expected directory structure is:: dir/ <period>/ edges/ <layer>/ *.parquet vertices.parquet Args: - dir: path to the root directory containing the Parquet files. Returns: - MultiplexSeries loaded from the directory. """ logger.info("Loading data from {dir}...") con = ibis.duckdb.connect() logger.info("Loading edges...") edges = con.read_parquet(f"{dir}/*/edges/**/*.parquet", table_name="edges") logger.info("Loading vertices") try: vertices = ibis.read_parquet(f"{dir}/*/vertices.parquet", table_name="vertices") except Exception as e: logger.info(f"No vertices found: {e}") vertices = None try: relationtypes = ibis.read_parquet(f"{dir}/*/relationtypes.csv") except Exception as e: logger.info(f"No relationtypes found: {e}") relationtypes = None m = MultiplexSeries(edges=edges, vertices=vertices, relationtypes=relationtypes) return m
# def save_multiplexseries( # edges: ibis.Table, # vertices: ibis.Table, # dir: Path | str, # existing_data_behavior="delete_matching", # **kwargs, # ) -> Tuple[ibis.Table, ibis.Table]: # """ # Save edges and vertices to disk following the muxpack directory structure. # The directory and all sub-directories are created if they do not exist. # Edges and vertices are not validated for consistency. # Args: # - edges: edge table to save. # - vertices: vertex table to save. # - dir: root path where the network will be saved. # - existing_data_behavior: passed through to ``pyarrow.dataset.write_dataset``. # - **kwargs: additional keyword arguments forwarded to ``pyarrow.dataset.write_dataset``. # Returns: # - Tuple of ``(edges, vertices)`` table objects pointing to the saved files. # """ # E = edges # V = vertices # dir = Path(dir) # logger.info(f"Saving network to {dir}...") # # We do a manual partitioning to have maximum control. # # alternative and potentially more efficient would be partitioning using # # duckdb, however, that would pose some problems: # # - Hive naming convention does not follow the muxpack specification # # - Hive partitioning removes columns that are partitioned. # periods = E[["period"]].distinct().period.to_list() # for period in periods: # period_dir = dir / f"{period}" # os.makedirs(period_dir, exist_ok=True) # # writing vertices # vertices_file = period_dir / "vertices.parquet" # V_period = V.filter(V.period == period) # V_period.to_parquet(vertices_file) # # writing edges # edges_dir = period_dir / "edges" # os.makedirs(edges_dir, exist_ok=True) # E_period = E.filter(E.period == period) # layers = E_period[["layer"]].distinct().layer.to_list() # logger.info(f"layers: {layers}") # for layer in layers: # layer_dir = edges_dir / f"{layer}" # # TODO further partition? # os.makedirs(layer_dir, exist_ok=True) # E_period_layer = E_period.filter(E_period.layer == layer).order_by( # ["src", "relationtype", "dst"] # ) # E_period_layer.to_parquet_dir( # layer_dir, existing_data_behavior=existing_data_behavior, **kwargs # ) # logger.info(f"\t\tSaved layer {layer}") # logger.info(f"\tFinished saving period {period}") # logger.info(f"Finished saving network to {dir}.") # con = ibis.duckdb.connect() # edges = con.read_parquet(f"{dir}/*/edges/**/*.parquet", table_name="edges") # vertices = con.read_parquet(f"{dir}/*/vertices.parquet", table_name="vertices") # return edges, vertices
[docs] def save_multiplex( edges: ibis.Table, vertices: ibis.Table, dir: Path | str, period: int | None, existing_data_behavior="delete_matching", **kwargs, ) -> Tuple[ibis.Table, ibis.Table]: """ Save a single-period multiplex to disk following the muxpack directory structure. The directory and all sub-directories are created if they do not exist. Edges and vertices are not validated for consistency. Args: - edges: edge table to save. - vertices: vertex table to save. - period: the period for this multiplex; if ``None``, all rows in ``edges`` are written to the same directory. - dir: root path where the multiplex will be saved. - existing_data_behavior: passed through to ``pyarrow.dataset.write_dataset``. - **kwargs: additional keyword arguments forwarded to ``pyarrow.dataset.write_dataset``. Returns: - Tuple of ``(edges, vertices)`` table objects pointing to the saved files. """ E = edges V = vertices dir = Path(dir) logger.info(f"Saving multiplex to {dir}...") # We do a manual partitioning to have maximum control. # alternative and potentially more efficient would be partitioning using # duckdb, however, that would pose some problems: # - Hive naming convention does not follow the muxpack specification # - Hive partitioning removes columns that are partitioned. os.makedirs(dir, exist_ok=True) # writing vertices vertices_file = dir / "vertices.parquet" if period is not None: # test if period column is there, if not add it to V = V.filter(V.period == period) V.to_parquet(vertices_file) # writing edges edges_dir = dir / "edges" os.makedirs(edges_dir, exist_ok=True) E_period = E.filter(E.period == period) layers = E_period[["layer"]].distinct().layer.to_list() logger.info(f"layers: {layers}") for layer in layers: layer_dir = edges_dir / f"{layer}" # TODO further partition? os.makedirs(layer_dir, exist_ok=True) E_period_layer = E_period.filter(E_period.layer == layer).order_by( ["src", "relationtype", "dst"] ) E_period_layer.to_parquet_dir( layer_dir, existing_data_behavior=existing_data_behavior, **kwargs ) logger.info(f"\t\tSaved layer {layer}") logger.info("\tFinished saving") con = ibis.duckdb.connect() edges = con.read_parquet(f"{dir}/edges/**/*.parquet", table_name="edges") vertices = con.read_parquet(f"{dir}/vertices.parquet", table_name="vertices") return edges, vertices
[docs] def save_multiplexseries( edges: ibis.Table, vertices: ibis.Table, dir: Path | str, existing_data_behavior="delete_matching", **kwargs ) -> Tuple[ibis.Table, ibis.Table]: """ Save edges and vertices to disk following the muxpack directory structure. The directory and all sub-directories are created if they do not exist. Edges and vertices are not validated for consistency. Args: - edges: edge table to save. - vertices: vertex table to save. - dir: root path where the network will be saved. - existing_data_behavior: passed through to ``pyarrow.dataset.write_dataset``. - **kwargs: additional keyword arguments forwarded to ``pyarrow.dataset.write_dataset``. Returns: - Tuple of ``(edges, vertices)`` table objects pointing to the saved files. """ dir = Path(dir) periods: list[str] = ( edges .select("period") .distinct() .order_by("period") .period .to_list() ) for period in periods: E = edges.filter(edges.period == period) V = vertices.filter(vertices.period == period) speriod = f"{period}" save_multiplex(edges=E, vertices=V, dir=dir / speriod, period=period, existing_data_behavior=existing_data_behavior, **kwargs) mp = read_multiplexseries(dir) return mp.edges, mp.vertices
[docs] def save_bipartite( edges: ibis.Table, role_src: str, role_dst: str, relationtype: str, dir: Path | str ) -> None: """ Save a bipartite graph to disk as a Parquet file plus a JSON metadata file. Args: - edges: edge table to save. - role_src: column name used for the source role. - role_dst: column name used for the destination role. - relationtype: column name used for the relation type. - dir: path to the directory where the files will be saved. """ dir = Path(dir) os.makedirs(dir, exist_ok=True) edges.to_parquet(dir / "edges.parquet") json_content = { "role_src": role_src, "role_dst": role_dst, "relationtype": relationtype, } with open(dir / "metadata.json", "w") as f: import json json.dump(json_content, f)
[docs] def read_bipartite(dir: Path | str) -> Bipartite: """ Load a bipartite graph from disk. Args: - dir: path to the directory containing ``edges.parquet`` and ``metadata.json``. Returns: - BiPartite object with edges and metadata loaded from disk. """ dir = Path(dir) edges = ibis.read_parquet(dir / "edges.parquet") with open(dir / "metadata.json", "r") as f: import json metadata = json.load(f) role_src = metadata["role_src"] role_dst = metadata["role_dst"] relationtype = metadata["relationtype"] return Bipartite( edges=edges, role_src=role_src, role_dst=role_dst, relationtype=relationtype )
if __name__ == "__main__": logging.basicConfig(level=logging.INFO) m = read_multiplexseries("data") save_multiplexseries(edges=m.edges, vertices=m.vertices, dir="data2")