Skip to content

Performance

Performance optimization and analysis utilities for statistical computations.

Performance optimization utilities for statistical computations.

apply_lazy_threshold(data, threshold_mb=500.0, force_dask=False)

Ensure data is lazy (Dask-backed) if it exceeds a certain size (Aero Protocol).

Parameters

data : xarray.DataArray or xarray.Dataset Input data to check. threshold_mb : float, optional Size threshold in Megabytes. Default is 500.0. force_dask : bool, optional If True, always convert to Dask regardless of size. Default is False.

Returns

xarray.DataArray or xarray.Dataset DataArray or Dataset, potentially converted to Dask.

Examples

import xarray as xr import numpy as np da = xr.DataArray(np.random.rand(1000, 1000), dims=['x', 'y']) # ~7.6 MB da_lazy = apply_lazy_threshold(da, threshold_mb=1.0) # Will convert to Dask

Source code in src/monet_stats/performance.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def apply_lazy_threshold(
    data: Union[xr.DataArray, xr.Dataset],
    threshold_mb: float = 500.0,
    force_dask: bool = False,
) -> Union[xr.DataArray, xr.Dataset]:
    """
    Ensure data is lazy (Dask-backed) if it exceeds a certain size (Aero Protocol).

    Parameters
    ----------
    data : xarray.DataArray or xarray.Dataset
        Input data to check.
    threshold_mb : float, optional
        Size threshold in Megabytes. Default is 500.0.
    force_dask : bool, optional
        If True, always convert to Dask regardless of size. Default is False.

    Returns
    -------
    xarray.DataArray or xarray.Dataset
        DataArray or Dataset, potentially converted to Dask.

    Examples
    --------
    >>> import xarray as xr
    >>> import numpy as np
    >>> da = xr.DataArray(np.random.rand(1000, 1000), dims=['x', 'y']) # ~7.6 MB
    >>> da_lazy = apply_lazy_threshold(da, threshold_mb=1.0) # Will convert to Dask
    """
    # Check if already lazy
    is_lazy = False
    if isinstance(data, xr.DataArray):
        if hasattr(data.data, "chunks") and data.data.chunks is not None:
            is_lazy = True
    elif isinstance(data, xr.Dataset):
        # Check if any variable is lazy
        if any(hasattr(data[v].data, "chunks") and data[v].data.chunks is not None for v in data.data_vars):
            is_lazy = True

    if is_lazy and not force_dask:
        return data

    size_mb = data.nbytes / (1024 * 1024)

    if force_dask or size_mb > threshold_mb:
        if not _has_dask():
            warnings.warn(
                f"Laziness requested (force_dask={force_dask} or size={size_mb:.2f}MB > threshold={threshold_mb}MB), "
                "but Dask is not installed. Continuing with eager computation.",
                UserWarning,
                stacklevel=2,
            )
            return data

        recommendation = get_chunk_recommendation(data)
        if not is_lazy:
            warnings.warn(
                f"Data size ({size_mb:.2f} MB) exceeds threshold ({threshold_mb:.2f} MB). "
                f"Converting to Dask with recommended chunks: {recommendation}",
                UserWarning,
                stacklevel=2,
            )
        res = data.chunk(recommendation)
        return _update_history(res, f"Converted to Dask (threshold={threshold_mb}MB)")

    return data

chunk_array(arr, chunk_size=1000000)

Split array into chunks for memory-efficient processing (Aero Protocol).

Parameters

arr : numpy.ndarray Input array to chunk. chunk_size : int, optional Size of each chunk (number of elements).

Returns

list List of array chunks.

Source code in src/monet_stats/performance.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def chunk_array(arr: np.ndarray, chunk_size: int = 1000000) -> list:
    """
    Split array into chunks for memory-efficient processing (Aero Protocol).

    Parameters
    ----------
    arr : numpy.ndarray
        Input array to chunk.
    chunk_size : int, optional
        Size of each chunk (number of elements).

    Returns
    -------
    list
        List of array chunks.
    """
    if arr.size == 0:
        return []

    # Vectorized chunking using np.array_split
    # We split along the first axis to preserve dimensionality of chunks.
    # Note: To maintain backward compatibility with the original implementation,
    # we use arr.size to determine the number of chunks, which may result in
    # empty arrays if arr.size > len(arr).
    num_chunks = max(1, int(np.ceil(arr.size / chunk_size)))
    return np.array_split(arr, num_chunks)

fast_mae(obs, mod, axis=None)

Fast computation of Mean Absolute Error.

Alias for error_metrics.MAE.

Parameters

obs : numpy.ndarray or xarray.DataArray Observed values. mod : numpy.ndarray or xarray.DataArray Model or predicted values. axis : int, str, or iterable, optional Axis along which to compute MAE.

Returns

Any Mean absolute error.

Source code in src/monet_stats/performance.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def fast_mae(
    obs: Union[np.ndarray, xr.DataArray],
    mod: Union[np.ndarray, xr.DataArray],
    axis: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
) -> Any:
    """
    Fast computation of Mean Absolute Error.

    Alias for error_metrics.MAE.

    Parameters
    ----------
    obs : numpy.ndarray or xarray.DataArray
        Observed values.
    mod : numpy.ndarray or xarray.DataArray
        Model or predicted values.
    axis : int, str, or iterable, optional
        Axis along which to compute MAE.

    Returns
    -------
    Any
        Mean absolute error.
    """
    from .error_metrics import MAE

    res = MAE(obs, mod, axis=axis)
    if hasattr(res, "attrs") and hasattr(res, "coords"):
        return _update_history(res, "fast_mae")
    return res

fast_rmse(obs, mod, axis=None)

Fast computation of Root Mean Square Error.

Alias for error_metrics.RMSE.

Parameters

obs : numpy.ndarray or xarray.DataArray Observed values. mod : numpy.ndarray or xarray.DataArray Model or predicted values. axis : int, str, or iterable, optional Axis along which to compute RMSE.

Returns

Any Root mean square error.

Source code in src/monet_stats/performance.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def fast_rmse(
    obs: Union[np.ndarray, xr.DataArray],
    mod: Union[np.ndarray, xr.DataArray],
    axis: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
) -> Any:
    """
    Fast computation of Root Mean Square Error.

    Alias for error_metrics.RMSE.

    Parameters
    ----------
    obs : numpy.ndarray or xarray.DataArray
        Observed values.
    mod : numpy.ndarray or xarray.DataArray
        Model or predicted values.
    axis : int, str, or iterable, optional
        Axis along which to compute RMSE.

    Returns
    -------
    Any
        Root mean square error.
    """
    from .error_metrics import RMSE

    res = RMSE(obs, mod, axis=axis)
    if hasattr(res, "attrs") and hasattr(res, "coords"):
        return _update_history(res, "fast_rmse")
    return res

get_chunk_recommendation(data, target_mb=100.0, chunk_dim=None)

Recommend chunk sizes for an xarray object to target a specific chunk size (Aero Protocol).

Parameters

data : xarray.DataArray or xarray.Dataset Input data to analyze. target_mb : float, optional Target size for each chunk in Megabytes. Default is 100.0. chunk_dim : str, optional Specific dimension to chunk along. If None, it tries to chunk the largest dimension.

Returns

Dict[str, int] Recommended chunk sizes dictionary.

Notes

Aero Protocol: Targets ~100MB per chunk by default for optimal performance.

Examples

import xarray as xr import numpy as np data_array = xr.DataArray(np.random.rand(1000, 1000), dims=['x', 'y']) get_chunk_recommendation(data_array, target_mb=1.0) {'x': 125, 'y': 1000}

Source code in src/monet_stats/performance.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def get_chunk_recommendation(
    data: Union[xr.DataArray, xr.Dataset],
    target_mb: float = 100.0,
    chunk_dim: Optional[str] = None,
) -> Dict[str, int]:
    """
    Recommend chunk sizes for an xarray object to target a specific chunk size (Aero Protocol).

    Parameters
    ----------
    data : xarray.DataArray or xarray.Dataset
        Input data to analyze.
    target_mb : float, optional
        Target size for each chunk in Megabytes. Default is 100.0.
    chunk_dim : str, optional
        Specific dimension to chunk along. If None, it tries to chunk the largest dimension.

    Returns
    -------
    Dict[str, int]
        Recommended chunk sizes dictionary.

    Notes
    -----
    Aero Protocol: Targets ~100MB per chunk by default for optimal performance.

    Examples
    --------
    >>> import xarray as xr
    >>> import numpy as np
    >>> data_array = xr.DataArray(np.random.rand(1000, 1000), dims=['x', 'y'])
    >>> get_chunk_recommendation(data_array, target_mb=1.0)
    {'x': 125, 'y': 1000}
    """
    if isinstance(data, xr.Dataset):
        if not data.data_vars:
            return {}
        # Use the first variable to estimate
        var_name = list(data.data_vars)[0]
        data_arr = data[var_name]
    else:
        data_arr = data

    itemsize = data_arr.dtype.itemsize
    total_elements = data_arr.size

    # Target elements per chunk
    target_elements = (target_mb * 1024 * 1024) / itemsize

    if target_elements >= total_elements:
        return {str(d): s for d, s in data_arr.sizes.items()}

    chunks = {str(d): s for d, s in data_arr.sizes.items()}

    if chunk_dim is not None:
        if chunk_dim not in data_arr.dims:
            raise ValueError(f"Dimension {chunk_dim} not found in data.")

        # Calculate size of other dimensions
        other_dims_size = total_elements / data_arr.sizes[chunk_dim]
        recommended_size = max(1, int(target_elements / other_dims_size))
        chunks[chunk_dim] = min(recommended_size, data_arr.sizes[chunk_dim])
    else:
        # Multi-dimensional chunking: iteratively reduce largest dimensions
        current_elements = float(total_elements)
        # Sort dimensions by size descending
        sorted_dims = sorted(data_arr.sizes.items(), key=lambda x: x[1], reverse=True)

        for d, s in sorted_dims:
            if current_elements <= target_elements:
                break

            # Calculate ideal size for this dimension to reach target elements
            other_dims_size = current_elements / s
            ideal_s = max(1, int(target_elements / other_dims_size))

            chunks[str(d)] = min(ideal_s, s)
            current_elements = (current_elements / s) * chunks[str(d)]

    return chunks

memory_efficient_correlation(x, y, axis=None)

Memory-efficient computation of Pearson correlation coefficient.

Alias for correlation_metrics.pearsonr.

Parameters

x : numpy.ndarray or xarray.DataArray First variable. y : numpy.ndarray or xarray.DataArray Second variable. axis : int, str, or iterable, optional Axis along which to compute correlation.

Returns

Any Pearson correlation coefficient.

Source code in src/monet_stats/performance.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def memory_efficient_correlation(
    x: Union[np.ndarray, xr.DataArray],
    y: Union[np.ndarray, xr.DataArray],
    axis: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
) -> Any:
    """
    Memory-efficient computation of Pearson correlation coefficient.

    Alias for correlation_metrics.pearsonr.

    Parameters
    ----------
    x : numpy.ndarray or xarray.DataArray
        First variable.
    y : numpy.ndarray or xarray.DataArray
        Second variable.
    axis : int, str, or iterable, optional
        Axis along which to compute correlation.

    Returns
    -------
    Any
        Pearson correlation coefficient.
    """
    from .correlation_metrics import pearsonr

    res = pearsonr(x, y, axis=axis)
    if hasattr(res, "attrs") and hasattr(res, "coords"):
        return _update_history(res, "memory_efficient_correlation")
    return res

optimize_for_size(func, obs, mod, axis=None)

Optimize function computation based on data size (Aero Protocol).

Parameters

func : callable Function to optimize. obs : numpy.ndarray or xarray.DataArray Observed values. mod : numpy.ndarray or xarray.DataArray Model values. axis : int, str, or iterable, optional Axis along which to compute.

Returns

Any Optimized computation result.

Source code in src/monet_stats/performance.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def optimize_for_size(
    func: Callable,
    obs: Union[np.ndarray, xr.DataArray],
    mod: Union[np.ndarray, xr.DataArray],
    axis: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
) -> Any:
    """
    Optimize function computation based on data size (Aero Protocol).

    Parameters
    ----------
    func : callable
        Function to optimize.
    obs : numpy.ndarray or xarray.DataArray
        Observed values.
    mod : numpy.ndarray or xarray.DataArray
        Model values.
    axis : int, str, or iterable, optional
        Axis along which to compute.

    Returns
    -------
    Any
        Optimized computation result.
    """
    if isinstance(obs, (xr.DataArray, xr.Dataset)):
        obs = apply_lazy_threshold(obs)
    if isinstance(mod, (xr.DataArray, xr.Dataset)):
        mod = apply_lazy_threshold(mod)

    return func(obs, mod, axis=axis)

parallel_compute(func, data, chunk_size=1000000, axis=None)

Compute function in parallel using chunking strategy (Aero Protocol).

For Xarray objects, this automatically ensures laziness via apply_lazy_threshold and leverages Dask if the data is already chunked.

Parameters

func : callable Function to apply to data chunks. data : numpy.ndarray, xarray.DataArray, or xarray.Dataset Input data to process. chunk_size : int, optional Size of data chunks for processing (NumPy only). Default is 1,000,000. axis : int, str, or iterable, optional Axis along which to compute.

Returns

Any Result of parallel computation.

Source code in src/monet_stats/performance.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def parallel_compute(
    func: Callable,
    data: Union[np.ndarray, xr.DataArray, xr.Dataset],
    chunk_size: int = 1000000,
    axis: Optional[Union[int, str, Iterable[Union[int, str]]]] = None,
) -> Any:
    """
    Compute function in parallel using chunking strategy (Aero Protocol).

    For Xarray objects, this automatically ensures laziness via `apply_lazy_threshold`
    and leverages Dask if the data is already chunked.

    Parameters
    ----------
    func : callable
        Function to apply to data chunks.
    data : numpy.ndarray, xarray.DataArray, or xarray.Dataset
        Input data to process.
    chunk_size : int, optional
        Size of data chunks for processing (NumPy only). Default is 1,000,000.
    axis : int, str, or iterable, optional
        Axis along which to compute.

    Returns
    -------
    Any
        Result of parallel computation.
    """
    if isinstance(data, (xr.DataArray, xr.Dataset)):
        # Ensure it's lazy if large
        data_lazy = apply_lazy_threshold(data)
        res = func(data_lazy, axis=axis)
        return _update_history(res, "parallel_compute")
    else:
        # For numpy arrays, eliminate explicit loops by converting to Dask
        data_arr = np.asanyarray(data)
        if data_arr.size > chunk_size:
            try:
                import dask.array as da
            except ImportError:
                # Fallback to eager computation if Dask is not available
                return func(data_arr, axis=axis)

            # Automatically chunk and parallelize via Dask
            # Use 'auto' for multi-dimensional arrays to ensure efficient chunking
            # while respecting the provided chunk_size as a rough guide for 1D.
            if data_arr.ndim > 1:
                dask_arr = da.from_array(data_arr, chunks="auto")
            else:
                dask_arr = da.from_array(data_arr, chunks=chunk_size)

            res = func(dask_arr, axis=axis)
            # If result is dask-backed, compute it for return
            if hasattr(res, "compute"):
                return res.compute()
            return res
        else:
            return func(data_arr, axis=axis)

vectorize_function(func, *args, **kwargs)

Apply function in a vectorized manner (Aero Protocol).

.. note:: This uses np.vectorize, which is a convenience wrapper and not necessarily high-performance as it often involves an internal loop. Prefer true NumPy/Xarray vectorization where possible.

Parameters

func : callable Function to vectorize. args : tuple Arguments to pass to function. *kwargs : dict Keyword arguments to pass to function.

Returns

Any Result of vectorized function application.

Source code in src/monet_stats/performance.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def vectorize_function(func: Callable, *args: Any, **kwargs: Any) -> Any:
    """
    Apply function in a vectorized manner (Aero Protocol).

    .. note::
        This uses `np.vectorize`, which is a convenience wrapper and not
        necessarily high-performance as it often involves an internal loop.
        Prefer true NumPy/Xarray vectorization where possible.

    Parameters
    ----------
    func : callable
        Function to vectorize.
    *args : tuple
        Arguments to pass to function.
    **kwargs : dict
        Keyword arguments to pass to function.

    Returns
    -------
    Any
        Result of vectorized function application.
    """
    return np.vectorize(func)(*args, **kwargs)