From 6e2a34a6deb9c2d6cba1eb55aade109334b84854 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Mon, 27 Feb 2023 15:22:42 +0100 Subject: [PATCH] Add `normalize_chunksize` and `partition` utility functions --- .pre-commit-config.yaml | 2 +- graphblas_algorithms/nxapi/_utils.py | 127 ++++++++++++++++++ graphblas_algorithms/nxapi/cluster.py | 30 ++--- .../nxapi/shortest_paths/weighted.py | 18 +-- .../nxapi/tests/test_utils.py | 33 +++++ pyproject.toml | 1 + 6 files changed, 181 insertions(+), 30 deletions(-) create mode 100644 graphblas_algorithms/nxapi/_utils.py create mode 100644 graphblas_algorithms/nxapi/tests/test_utils.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 43f40ff..ba3e8e9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -71,7 +71,7 @@ repos: additional_dependencies: [tomli] files: ^(graphblas_algorithms|docs)/ - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.0.249 + rev: v0.0.252 hooks: - id: ruff - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/graphblas_algorithms/nxapi/_utils.py b/graphblas_algorithms/nxapi/_utils.py new file mode 100644 index 0000000..db309a4 --- /dev/null +++ b/graphblas_algorithms/nxapi/_utils.py @@ -0,0 +1,127 @@ +from math import ceil +from numbers import Number + +try: + from itertools import pairwise # Added in Python 3.10 +except ImportError: + + def pairwise(it): + it = iter(it) + for prev in it: + for cur in it: + yield (prev, cur) + prev = cur + + +BYTES_UNITS = { + "": 1, + "b": 1, + "kb": 1000, + "mb": 1000**2, + "gb": 1000**3, + "tb": 1000**4, + "pb": 1000**5, + "eb": 1000**6, + "zb": 1000**7, + "kib": 1024, + "mib": 1024**2, + "gib": 1024**3, + "tib": 1024**4, + "pib": 1024**5, + "eib": 1024**6, + "zib": 1024**7, +} + + +def normalize_chunksize(chunksize, itemsize=1, N=None): + if chunksize is None: + return None + if isinstance(chunksize, Number): + rv = int(chunksize) + if rv <= 0 or N is not None and rv >= N: + return None + return rv + if not isinstance(chunksize, str): + raise TypeError(f"chunksize must be a number or a string; got {type(chunksize)}") + chunkstring = chunksize.replace(" ", "").replace("_", "").lower() + if not chunkstring or chunkstring == "all": + return None + for i, c in enumerate(reversed(chunkstring)): + if c.isdigit(): + index = len(chunkstring) - i + break + else: + chunkstring = f"1{chunkstring}" + index = 1 + + prefix = chunkstring[:index] + suffix = chunkstring[index:] + + try: + number = float(prefix) + except ValueError as exc: + raise ValueError( + f"Bad chunksize: {chunksize!r}. Could not interpret {prefix!r} as a number." + ) from exc + + if suffix in {"chunk", "chunks"}: + if number <= 1: + return None + if N is None: + raise TypeError( + f"N argument is required to determine chunksize to split into {int(number)} chunks" + ) + rv = ceil(N / number) + else: + scale = BYTES_UNITS.get(suffix) + if scale is None: + raise ValueError( + f"Bad chunksize: {chunksize!r}. Could not interpret {suffix!r} as a bytes unit." + ) + number *= scale + if chunkstring[-1] == "b": + number = max(1, number / itemsize) + rv = int(round(number)) + if rv <= 0 or N is not None and rv >= N: + return None + return rv + + +def partition(chunksize, L, *, evenly=True): + """Partition a list into chunks""" + N = len(L) + if N == 0: + return + chunksize = int(chunksize) + if chunksize <= 0 or chunksize >= N: + yield L + return + if chunksize == 1: + yield from L + return + if evenly: + k = ceil(L / chunksize) + if k * chunksize != N: + yield from split_evenly(k, L) + return + for start, stop in pairwise(range(0, N + chunksize, chunksize)): + yield L[start:stop] + + +def split_evenly(k, L): + """Split a list into approximately-equal parts""" + N = len(L) + if N == 0: + return + k = int(k) + if k <= 1: + yield L + return + start = 0 + for i in range(1, k): + stop = (N * i + k - 1) // k + if stop != start: + yield L[start:stop] + start = stop + if stop != N: + yield L[stop:] diff --git a/graphblas_algorithms/nxapi/cluster.py b/graphblas_algorithms/nxapi/cluster.py index 29d4695..425fd09 100644 --- a/graphblas_algorithms/nxapi/cluster.py +++ b/graphblas_algorithms/nxapi/cluster.py @@ -5,6 +5,8 @@ from graphblas_algorithms.classes.graph import to_undirected_graph from graphblas_algorithms.utils import not_implemented_for +from ._utils import normalize_chunksize, partition + __all__ = [ "triangles", "transitivity", @@ -90,11 +92,11 @@ def _split(L, k): # TODO: should this move into algorithms? -def _square_clustering_split(G, node_ids=None, *, nsplits): +def _square_clustering_split(G, node_ids=None, *, chunksize): if node_ids is None: node_ids, _ = G._A.reduce_rowwise(monoid.any).to_coo(values=False) result = None - for chunk_ids in _split(node_ids, nsplits): + for chunk_ids in partition(chunksize, node_ids): res = algorithms.square_clustering(G, chunk_ids) if result is None: result = res @@ -103,36 +105,32 @@ def _square_clustering_split(G, node_ids=None, *, nsplits): return result -def square_clustering(G, nodes=None, *, nsplits="auto"): - # `nsplits` is used to split the computation into chunks. +def square_clustering(G, nodes=None, *, chunksize="256 MiB"): + # `chunksize` is used to split the computation into chunks. # square_clustering computes `A @ A`, which can get very large, even dense. - # The default `nsplits` is to choose the number so that `Asubset @ A` + # The default `chunksize` is to choose the number so that `Asubset @ A` # will be about 256 MB if dense. G = to_undirected_graph(G) if len(G) == 0: return {} - if nsplits == "auto": - # TODO: make a utility function for this that can be reused - # Also, should we use `chunksize` instead of `nsplits`? - targetsize = 256 * 1024 * 1024 # 256 MB - nsplits = len(G) ** 2 * G._A.dtype.np_type.itemsize // targetsize - if nsplits <= 1: - nsplits = None + + chunksize = normalize_chunksize(chunksize, len(G) * G._A.dtype.np_type.itemsize, len(G)) + if nodes is None: # Should we use this one for subsets of nodes as well? - if nsplits is None: + if chunksize is None: result = algorithms.square_clustering(G) else: - result = _square_clustering_split(G, nsplits=nsplits) + result = _square_clustering_split(G, chunksize=chunksize) return G.vector_to_nodemap(result, fill_value=0) if nodes in G: idx = G._key_to_id[nodes] return algorithms.single_square_clustering(G, idx) ids = G.list_to_ids(nodes) - if nsplits is None: + if chunksize is None: result = algorithms.square_clustering(G, ids) else: - result = _square_clustering_split(G, ids, nsplits=nsplits) + result = _square_clustering_split(G, ids, chunksize=chunksize) return G.vector_to_nodemap(result) diff --git a/graphblas_algorithms/nxapi/shortest_paths/weighted.py b/graphblas_algorithms/nxapi/shortest_paths/weighted.py index 1c5e759..d6bf1d2 100644 --- a/graphblas_algorithms/nxapi/shortest_paths/weighted.py +++ b/graphblas_algorithms/nxapi/shortest_paths/weighted.py @@ -1,6 +1,7 @@ from graphblas_algorithms import algorithms from graphblas_algorithms.classes.digraph import to_graph +from .._utils import normalize_chunksize, partition from ..exception import NetworkXUnbounded, NodeNotFound __all__ = [ @@ -9,18 +10,14 @@ ] -def all_pairs_bellman_ford_path_length(G, weight="weight", *, chunksize="auto"): +def all_pairs_bellman_ford_path_length(G, weight="weight", *, chunksize="10 MiB"): # Larger chunksize offers more parallelism, but uses more memory. # Chunksize indicates for how many source nodes to compute at one time. # The default is to choose the number of rows so the result, if dense, # will be about 10MB. G = to_graph(G, weight=weight) - if chunksize == "auto": - # TODO: make a utility function for this that can be reused - targetsize = 10 * 1024 * 1024 # 10 MB - chunksize = max(1, targetsize // (len(G) * G._A.dtype.np_type.itemsize)) - - if chunksize is None or chunksize <= 0 or chunksize >= len(G): + chunksize = normalize_chunksize(chunksize, len(G) * G._A.dtype.np_type.itemsize, len(G)) + if chunksize is None: # All at once try: D = algorithms.bellman_ford_path_lengths(G) @@ -35,12 +32,7 @@ def all_pairs_bellman_ford_path_length(G, weight="weight", *, chunksize="auto"): raise NetworkXUnbounded(*e.args) from e yield (source, G.vector_to_nodemap(d)) else: - # We should probably make a utility function for chunking - nodes = list(G) - for start, stop in zip( - range(0, len(nodes), chunksize), range(chunksize, len(nodes) + chunksize, chunksize) - ): - cur_nodes = nodes[start:stop] + for cur_nodes in partition(chunksize, list(G)): try: D = algorithms.bellman_ford_path_lengths(G, cur_nodes) except algorithms.exceptions.Unbounded as e: diff --git a/graphblas_algorithms/nxapi/tests/test_utils.py b/graphblas_algorithms/nxapi/tests/test_utils.py new file mode 100644 index 0000000..0fda5d9 --- /dev/null +++ b/graphblas_algorithms/nxapi/tests/test_utils.py @@ -0,0 +1,33 @@ +import pytest + +from graphblas_algorithms.nxapi._utils import normalize_chunksize + + +def test_normalize_chunksize(): + assert normalize_chunksize(None) is None + assert normalize_chunksize("all") is None + assert normalize_chunksize("") is None + assert normalize_chunksize(-1) is None + assert normalize_chunksize("-1") is None + assert normalize_chunksize(10, N=10) is None + assert normalize_chunksize("1 MB", N=100) is None + assert normalize_chunksize("1 chunk") is None + assert normalize_chunksize("2 chunks", N=20) == 10 + assert normalize_chunksize(10) == 10 + assert normalize_chunksize(10.0) == 10 + assert normalize_chunksize("10") == 10 + assert normalize_chunksize("10.0") == 10 + assert normalize_chunksize("1_0 B") == 10 + assert normalize_chunksize("1e1") == 10 + assert normalize_chunksize("1e-2 kb") == 10 + assert normalize_chunksize("Mb") == 1000**2 + assert normalize_chunksize(" mb") == 1000**2 + assert normalize_chunksize("gib") == 1024**3 + with pytest.raises(TypeError, match="chunksize must be"): + normalize_chunksize(object()) + with pytest.raises(ValueError, match="as a bytes"): + normalize_chunksize("10 badbytes") + with pytest.raises(ValueError, match="as a number"): + normalize_chunksize("1bad0 TB") + with pytest.raises(TypeError, match="N argument is required"): + normalize_chunksize("10 chunks") diff --git a/pyproject.toml b/pyproject.toml index eec83f8..f1e4472 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -199,6 +199,7 @@ ignore = [ "PLR0913", # Too many arguments to function call "PLR0915", # Too many statements "PLR2004", # Magic number used in comparison, consider replacing magic with a constant variable + "PLW2901", # Outer for loop variable ... overwritten by inner assignment target (Note: good advice, but too strict) "RET502", # Do not implicitly `return None` in function able to return non-`None` value "RET503", # Missing explicit `return` at the end of function able to return non-`None` value "RET504", # Unnecessary variable assignment before `return` statement