diff --git a/doc/release_notes.rst b/doc/release_notes.rst index 7e849a42..fb5cd3e2 100644 --- a/doc/release_notes.rst +++ b/doc/release_notes.rst @@ -21,6 +21,10 @@ Upcoming Version * ``add_variables(binary=True, ...)`` now accepts ``lower``/``upper`` bounds, as long as they are 0 or 1. Previously binary bounds could only be set via the ``.lower``/``.upper`` setters after creation. (https://github.com/PyPSA/linopy/issues/776) +**Performance** + +* ``LinearExpression.groupby(...).sum()`` now scatters terms directly into the padded result arrays instead of unstacking through pandas ``MultiIndex`` machinery, cutting peak memory to input + result and speeding up the grouping. + **Deprecations** * Mutation via assignment to ``Variable.lower`` / ``Variable.upper`` / ``Constraint.coeffs`` / ``Constraint.vars`` / ``Constraint.lhs`` / ``Constraint.sign`` / ``Constraint.rhs`` is deprecated and emits a ``DeprecationWarning``. Use ``Variable.update(...)`` / ``Constraint.update(...)`` instead — the canonical mutation API with one validation path and one place that flips the persistent-solver dirty flag. Read access to these properties is unchanged. The setters will be removed in a future release. diff --git a/linopy/expressions.py b/linopy/expressions.py index ea8588d2..a3b10ba8 100644 --- a/linopy/expressions.py +++ b/linopy/expressions.py @@ -340,20 +340,19 @@ def sum( # At this point, group is always a pandas Series assert isinstance(group, pd.Series) - group_dim = group.index.name - - arrays = [group, group.groupby(group).cumcount()] - idx = pd.MultiIndex.from_arrays(arrays, names=[GROUP_DIM, GROUPED_TERM_DIM]) - new_coords = Coordinates.from_pandas_multiindex(idx, group_dim) - # collapsing group_dim invalidates every coordinate aligned to it - names_to_drop = [ - name - for name, coord in self.data.coords.items() - if group_dim in coord.dims - ] - ds = self.data.drop_vars(names_to_drop).assign_coords(new_coords) - ds = ds.unstack(group_dim, fill_value=LinearExpression._fill_value) - ds = LinearExpression._sum(ds, dim=GROUPED_TERM_DIM) + + numpy_backed = all( + isinstance(self.data[k].data, np.ndarray) + for k in ("coeffs", "vars", "const") + ) + if numpy_backed: + ds = self._sum_by_scatter(group) + else: + logger.debug( + "groupby-sum: non-numpy-backed (e.g. dask) data, " + "falling back to the unstack kernel." + ) + ds = self._sum_by_unstack(group) if int_map is not None: index = ds.indexes[GROUP_DIM].map({v: k for k, v in int_map.items()}) @@ -374,6 +373,98 @@ def func(ds: Dataset) -> Dataset: return self.map(func, **kwargs, shortcut=True) + def _sum_by_scatter(self, group: pd.Series) -> Dataset: + """ + Sum groups by scattering all terms directly into the final padded arrays. + + Every group member keeps its block of ``nterm`` terms, so the resulting + term dimension has size ``max_group_size * nterm`` and smaller groups are + padded with fill values. In contrast to :meth:`_sum_by_unstack` only the + result arrays are allocated, without intermediate copies of that size. + + Only the term and constant values are computed with numpy; the result + structure (dimensions, coordinates and their order) is assembled by + xarray itself and thereby matches the result of unstacking the group + dimension. The caller dispatches here only for numpy-backed data + (chunked data uses :meth:`_sum_by_unstack`). + """ + data = self.data + group_dim = group.index.name + fill_value = LinearExpression._fill_value + + codes, unique_groups = pd.factorize(group, sort=True) + if (codes == -1).any(): + raise ValueError( + "Cannot group by a pandas object containing NaN values. " + "Drop or fill the corresponding entries before grouping." + ) + + n_groups = len(unique_groups) + sizes = np.bincount(codes, minlength=n_groups) + max_size = int(sizes.max()) if n_groups else 0 + + # position of each element within its group (order of appearance) + positions = pd.Series(codes).groupby(codes).cumcount().to_numpy() + + def scatter( + da: DataArray, fill: Any + ) -> tuple[tuple[Hashable, ...], np.ndarray]: + """Scatter one term-array into its padded (group x term) layout.""" + rest_dims = [d for d in da.dims if d not in (group_dim, TERM_DIM)] + values = da.transpose(group_dim, *rest_dims, TERM_DIM).values + rest_shape = values.shape[1:-1] + nterm = values.shape[-1] + + out = np.full( + (n_groups, *rest_shape, nterm, max_size), fill, dtype=values.dtype + ) + locs = (codes, *(slice(None),) * (len(rest_shape) + 1), positions) + out[locs] = values + # collapsing (nterm, max_size) into one axis keeps all terms of one + # group member together, with padding at the end of each block + out = out.reshape((n_groups, *rest_shape, nterm * max_size)) + return (GROUP_DIM, *rest_dims, TERM_DIM), out + + coeffs_dims, coeffs = scatter(data.coeffs, fill_value["coeffs"]) + vars_dims, vars = scatter(data.vars, fill_value["vars"]) + + # constants are summed up within each group, skipping NaN values + const_dims = [d for d in data.const.dims if d != group_dim] + const_values = data.const.transpose(group_dim, *const_dims).values + const = np.zeros((n_groups, *const_values.shape[1:]), dtype=const_values.dtype) + np.add.at(const, codes, np.where(np.isnan(const_values), 0, const_values)) + + structure = data.drop_vars(["coeffs", "vars", "const"]) + structure = structure.drop_dims(group_dim) + structure = structure.expand_dims({GROUP_DIM: unique_groups}) + + return structure.assign( + coeffs=(coeffs_dims, coeffs), + vars=(vars_dims, vars), + const=((GROUP_DIM, *const_dims), const), + ) + + def _sum_by_unstack(self, group: pd.Series) -> Dataset: + """ + Sum groups by unstacking the group dimension into a padded helper + dimension and summing over it. + + Equivalent to :meth:`_sum_by_scatter`, but goes through xarray's + unstack/stack machinery. It is the fallback for chunked (dask) data, + which cannot be scattered into preallocated numpy buffers. + """ + group_dim = group.index.name + arrays = [group, group.groupby(group).cumcount()] + idx = pd.MultiIndex.from_arrays(arrays, names=[GROUP_DIM, GROUPED_TERM_DIM]) + new_coords = Coordinates.from_pandas_multiindex(idx, group_dim) + # collapsing group_dim invalidates every coordinate aligned to it + names_to_drop = [ + name for name, coord in self.data.coords.items() if group_dim in coord.dims + ] + ds = self.data.drop_vars(names_to_drop).assign_coords(new_coords) + ds = ds.unstack(group_dim, fill_value=LinearExpression._fill_value) + return LinearExpression._sum(ds, dim=GROUPED_TERM_DIM) + def roll(self, **kwargs: Any) -> LinearExpression: """ Roll the groupby object. diff --git a/test/test_linear_expression.py b/test/test_linear_expression.py index 5ffd7de1..20032351 100644 --- a/test/test_linear_expression.py +++ b/test/test_linear_expression.py @@ -7,7 +7,10 @@ from __future__ import annotations +import logging import warnings +from collections.abc import Callable +from types import SimpleNamespace from typing import Any import numpy as np @@ -1908,6 +1911,190 @@ def test_linear_expression_groupby_from_variable(v: Variable) -> None: assert grouped.nterm == 10 +@pytest.fixture +def scatter_ctx() -> SimpleNamespace: + """Shared 60-element building blocks for the scatter-vs-unstack case table.""" + m = Model() + rng = np.random.default_rng(0) + idx = pd.RangeIndex(60, name="elem") + other = pd.Index(list("abc"), name="other") + p, q = pd.Index(list("pq"), name="p"), pd.Index([10, 20, 30], name="q") + a, b = pd.Index(range(12), name="a"), pd.Index(list("vwxyz"), name="b") + + const = xr.DataArray(rng.normal(size=(3, 60)), coords=[other, idx]) + y = m.add_variables(coords=[other, idx], name="y") + yab = m.add_variables(coords=[a, b], name="yab") + stacked = LinearExpression((2 * yab + 1 * yab).data.stack(elem=["a", "b"]), m) + skewed = pd.Series(rng.choice(8, 60, p=[0.5] + [0.5 / 7] * 7), index=idx, name="g") + + return SimpleNamespace( + m=m, + x=m.add_variables(coords=[idx], name="x"), + y=y, + y3=m.add_variables(coords=[p, q, idx], name="y3"), + mx=m.add_variables( + coords=[idx], name="mx", mask=xr.DataArray(np.arange(60) % 4 != 0, [idx]) + ), + my=m.add_variables( + coords=[other, idx], + name="my", + mask=xr.DataArray(rng.random((3, 60)) > 0.25, [other, idx]), + ), + const=const, + nan_const=const.where(rng.random((3, 60)) > 0.3), + nan_vec=np.where(np.arange(60) % 3, np.nan, 5.0), + y_aux=(2 * y + 1 * y).assign_coords( + carrier=("elem", rng.choice(list("PQ"), 60)), + tag=(("other", "elem"), rng.integers(0, 9, (3, 60))), + ), + stacked=stacked, + skewed=skewed, + one_group=pd.Series(1, index=idx, name="g"), + identity=pd.Series(np.arange(60), index=idx, name="g"), + mi_groups=skewed.set_axis(stacked.data.indexes["elem"]), + ) + + +# Each case maps a structure to (expr, groups) from `scatter_ctx`. The skewed +# group puts ~half the elements in group 0 and spreads 1..7 over the rest. +SCATTER_EQUALS_UNSTACK_CASES = { + "skewed_int_groups": lambda c: (3 * c.x - 2 * c.x + 7, c.skewed), + "multidim_with_const": lambda c: (2 * c.y + 1 * c.y + c.const, c.skewed), + "nan_const": lambda c: (1 * c.x + c.nan_vec, c.skewed), + "masked_vars": lambda c: (1 * c.mx, c.skewed), + "quadratic": lambda c: (c.x * c.x + 2 * c.x, c.skewed), + "single_group": lambda c: (1 * c.x, c.one_group), + "identity_groups": lambda c: (1 * c.x, c.identity), + # combined structures exercising several features at once + "multidim_const_nan": lambda c: ( + 2 * c.y - 3 * c.y + 1 * c.y + c.nan_const, + c.skewed, + ), + "three_dims": lambda c: (4 * c.y3 + 1 * c.y3, c.skewed), + "quadratic_multidim_const": lambda c: (c.y * c.y + 2 * c.y + c.const, c.skewed), + "masked_multidim": lambda c: (5 * c.my - 2 * c.my, c.skewed), + # both collapse the grouped dim, dropping every coordinate tied to it + "aux_coords_on_group_dim": lambda c: (c.y_aux, c.skewed), + "multiindex_dim": lambda c: (c.stacked, c.mi_groups), +} + + +class TestGroupbySumScatterKernel: + """ + ``groupby(...).sum()`` takes a scatter fast path (``_sum_by_scatter``) for + numpy-backed expressions and falls back to the xarray unstack machinery + (``_sum_by_unstack``) for chunked data and exotic coordinates. These tests + pin the two kernels together and cover the structural edge cases. + """ + + @staticmethod + def _assert_kernels_identical(gb: Any, groups: pd.Series, m: Model) -> None: + """Force both kernels and assert they produce the same expression.""" + scatter = LinearExpression(gb._sum_by_scatter(groups).rename(_group="g"), m) + unstack = LinearExpression(gb._sum_by_unstack(groups).rename(_group="g"), m) + + assert scatter.data.coeffs.dims == unstack.data.coeffs.dims + assert scatter.data.const.dims == unstack.data.const.dims + assert list(scatter.data.coords) == list(unstack.data.coords) + for name in scatter.data.coords: + assert_equal(scatter.data[name], unstack.data[name]) + + np.testing.assert_array_equal(scatter.vars.values, unstack.vars.values) + np.testing.assert_array_equal(scatter.coeffs.values, unstack.coeffs.values) + # constants may differ only by floating-point summation order + np.testing.assert_allclose( + scatter.const.values, unstack.const.values, rtol=1e-12 + ) + + def test_skewed_unsorted_groups(self, v: Variable) -> None: + """ + The scatter-based fast path must match the xarray fallback for groups + that are unsorted, non-contiguous and of very different sizes. + """ + expr = 2 * v + 5 + # 'b' appears 14 times, 'c' 5 times, 'a' once, scattered over the dimension + labels = ["b"] * 4 + ["c", "a"] + ["b"] * 5 + ["c"] * 4 + ["b"] * 5 + groups = pd.Series(labels, index=v.indexes["dim_2"], name="letter") + + grouped = expr.groupby(groups).sum() + fallback = expr.groupby(groups.to_xarray()).sum(use_fallback=True) + + assert list(grouped.data.letter) == ["a", "b", "c"] + # padded to the largest group times the number of terms of the input + assert grouped.nterm == 14 * expr.nterm + assert_linequal(grouped, fallback) + + # every group carries exactly the variables of its members, rest is fill + for letter in ["a", "b", "c"]: + members = np.where(np.array(labels) == letter)[0] + vars_of_group = grouped.data.vars.sel(letter=letter).values + present = set(vars_of_group[vars_of_group >= 0]) + assert present == set(v.labels.values[members]) + assert (vars_of_group >= 0).sum() == len(members) * expr.nterm + assert grouped.const.sel(letter=letter).item() == 5 * len(members) + + def test_chunked_uses_unstack( + self, v: Variable, caplog: pytest.LogCaptureFixture + ) -> None: + """Chunked (dask-backed) expressions group via xarray's unstack path.""" + pytest.importorskip("dask") + expr = 2 * v + 5 + groups = pd.Series([1] * 12 + [2] * 8, index=v.indexes["dim_2"], name="group") + + chunked = LinearExpression(expr.data.chunk({"dim_2": 5}), expr.model) + with caplog.at_level(logging.DEBUG, logger="linopy.expressions"): + grouped_chunked = chunked.groupby(groups).sum() + assert "falling back to the unstack kernel" in caplog.text + + grouped = expr.groupby(groups).sum() + assert grouped_chunked.nterm == grouped.nterm + assert_linequal( + LinearExpression(grouped_chunked.data.compute(), expr.model), grouped + ) + + def test_nan_groups_raise(self, v: Variable) -> None: + expr = 1 * v + groups = pd.Series( + [1.0, np.nan] * 10, index=v.indexes["dim_2"], name="with_nans" + ) + with pytest.raises(ValueError, match="NaN"): + expr.groupby(groups).sum() + + def test_empty_groups(self) -> None: + """An empty group dimension scatters into an empty, well-formed result.""" + m = Model() + idx = pd.RangeIndex(0, name="elem") + x = m.add_variables(coords=[idx], name="x") + groups = pd.Series([], index=idx, name="g", dtype=int) + + grouped = (1 * x).groupby(groups).sum() + assert grouped.nterm == 0 + assert dict(grouped.data.sizes) == {"g": 0, "_term": 0} + + @pytest.mark.parametrize( + "build", + SCATTER_EQUALS_UNSTACK_CASES.values(), + ids=SCATTER_EQUALS_UNSTACK_CASES.keys(), + ) + def test_scatter_equals_unstack( + self, + build: Callable[[SimpleNamespace], tuple[LinearExpression, pd.Series]], + scatter_ctx: SimpleNamespace, + ) -> None: + """ + Lock the two groupby-sum kernels together. + + The fast path scatters terms into numpy arrays (``_sum_by_scatter``); + the unstack implementation (``_sum_by_unstack``) is kept for chunked + data. Both must stay interchangeable — if an xarray/pandas update + changes the unstack output or an edge case diverges, this fails. See + ``SCATTER_EQUALS_UNSTACK_CASES`` for the structures covered. + """ + expr, groups = build(scatter_ctx) + gb = expr.groupby(groups) + self._assert_kernels_identical(gb, groups, scatter_ctx.m) + + def test_linear_expression_rolling(v: Variable) -> None: expr = 1 * v rolled = expr.rolling(dim_2=2).sum()