Skip to content

Plugin System

Framework for extending Monet Stats with custom metrics and functionality.

Plugin system architecture for extending statistical metrics (Aero Protocol Compliant).

This module provides the infrastructure to register and execute custom statistical metrics as plugins, maintaining consistency with the core Monet Stats API.

CustomMetric

Bases: PluginInterface

Wrapper for user-defined functions to act as statistical plugins.

This class implements the PluginInterface, allowing arbitrary functions to be integrated into the Monet Stats ecosystem with proper backend handling and provenance tracking.

Source code in src/monet_stats/plugin_system.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class CustomMetric(PluginInterface):
    """
    Wrapper for user-defined functions to act as statistical plugins.

    This class implements the PluginInterface, allowing arbitrary functions
    to be integrated into the Monet Stats ecosystem with proper
    backend handling and provenance tracking.
    """

    def __init__(self, name: str, description: str, func: Callable) -> None:
        """
        Initialize the CustomMetric.

        Parameters
        ----------
        name : str
            Human-readable name of the metric.
        description : str
            Detailed description of what the metric calculates.
        func : Callable
            The underlying computation function.
        """
        self._name = name
        self._description = description
        self._func = func

    def name(self) -> str:
        """
        Return the metric name.

        Returns
        -------
        str
            The metric name.
        """
        return self._name

    def description(self) -> str:
        """
        Return the metric description.

        Returns
        -------
        str
            The metric description.
        """
        return self._description

    def compute(
        self,
        obs: Union[np.ndarray, xr.DataArray],
        mod: Union[np.ndarray, xr.DataArray],
        **kwargs: Any,
    ) -> Union[float, np.ndarray, xr.DataArray]:
        """
        Compute the custom metric with Aero Protocol enhancements.

        This implementation ensures that if Xarray DataArrays are provided,
        they are aligned, and the scientific history is updated.

        Parameters
        ----------
        obs : Union[np.ndarray, xr.DataArray]
            Observed values.
        mod : Union[np.ndarray, xr.DataArray]
            Model values.
        **kwargs : Any
            Additional arguments passed to the underlying function.

        Returns
        -------
        Union[float, np.ndarray, xr.DataArray]
            The computed metric.
        """
        if isinstance(obs, xr.DataArray) and isinstance(mod, xr.DataArray):
            obs, mod = xr.align(obs, mod, join="inner")
            res = self._func(obs, mod, **kwargs)
            return _update_history(res, self._name)

        return self._func(obs, mod, **kwargs)

    def validate_inputs(
        self,
        obs: Union[np.ndarray, xr.DataArray],
        mod: Union[np.ndarray, xr.DataArray],
        **kwargs: Any,
    ) -> bool:
        """
        Validate inputs for compatibility (metadata-only check for laziness).

        Parameters
        ----------
        obs : Union[np.ndarray, xr.DataArray]
            Observed values.
        mod : Union[np.ndarray, xr.DataArray]
            Model values.
        **kwargs : Any
            Additional arguments.

        Returns
        -------
        bool
            True if inputs are compatible.
        """
        if not (isinstance(obs, (np.ndarray, xr.DataArray)) and isinstance(mod, (np.ndarray, xr.DataArray))):
            return False

        if hasattr(obs, "shape") and hasattr(mod, "shape"):
            try:
                np.broadcast_shapes(obs.shape, mod.shape)
            except ValueError:
                return False

        return True

__init__(name, description, func)

Initialize the CustomMetric.

Parameters

name : str Human-readable name of the metric. description : str Detailed description of what the metric calculates. func : Callable The underlying computation function.

Source code in src/monet_stats/plugin_system.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def __init__(self, name: str, description: str, func: Callable) -> None:
    """
    Initialize the CustomMetric.

    Parameters
    ----------
    name : str
        Human-readable name of the metric.
    description : str
        Detailed description of what the metric calculates.
    func : Callable
        The underlying computation function.
    """
    self._name = name
    self._description = description
    self._func = func

compute(obs, mod, **kwargs)

Compute the custom metric with Aero Protocol enhancements.

This implementation ensures that if Xarray DataArrays are provided, they are aligned, and the scientific history is updated.

Parameters

obs : Union[np.ndarray, xr.DataArray] Observed values. mod : Union[np.ndarray, xr.DataArray] Model values. **kwargs : Any Additional arguments passed to the underlying function.

Returns

Union[float, np.ndarray, xr.DataArray] The computed metric.

Source code in src/monet_stats/plugin_system.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def compute(
    self,
    obs: Union[np.ndarray, xr.DataArray],
    mod: Union[np.ndarray, xr.DataArray],
    **kwargs: Any,
) -> Union[float, np.ndarray, xr.DataArray]:
    """
    Compute the custom metric with Aero Protocol enhancements.

    This implementation ensures that if Xarray DataArrays are provided,
    they are aligned, and the scientific history is updated.

    Parameters
    ----------
    obs : Union[np.ndarray, xr.DataArray]
        Observed values.
    mod : Union[np.ndarray, xr.DataArray]
        Model values.
    **kwargs : Any
        Additional arguments passed to the underlying function.

    Returns
    -------
    Union[float, np.ndarray, xr.DataArray]
        The computed metric.
    """
    if isinstance(obs, xr.DataArray) and isinstance(mod, xr.DataArray):
        obs, mod = xr.align(obs, mod, join="inner")
        res = self._func(obs, mod, **kwargs)
        return _update_history(res, self._name)

    return self._func(obs, mod, **kwargs)

description()

Return the metric description.

Returns

str The metric description.

Source code in src/monet_stats/plugin_system.py
157
158
159
160
161
162
163
164
165
166
def description(self) -> str:
    """
    Return the metric description.

    Returns
    -------
    str
        The metric description.
    """
    return self._description

name()

Return the metric name.

Returns

str The metric name.

Source code in src/monet_stats/plugin_system.py
146
147
148
149
150
151
152
153
154
155
def name(self) -> str:
    """
    Return the metric name.

    Returns
    -------
    str
        The metric name.
    """
    return self._name

validate_inputs(obs, mod, **kwargs)

Validate inputs for compatibility (metadata-only check for laziness).

Parameters

obs : Union[np.ndarray, xr.DataArray] Observed values. mod : Union[np.ndarray, xr.DataArray] Model values. **kwargs : Any Additional arguments.

Returns

bool True if inputs are compatible.

Source code in src/monet_stats/plugin_system.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def validate_inputs(
    self,
    obs: Union[np.ndarray, xr.DataArray],
    mod: Union[np.ndarray, xr.DataArray],
    **kwargs: Any,
) -> bool:
    """
    Validate inputs for compatibility (metadata-only check for laziness).

    Parameters
    ----------
    obs : Union[np.ndarray, xr.DataArray]
        Observed values.
    mod : Union[np.ndarray, xr.DataArray]
        Model values.
    **kwargs : Any
        Additional arguments.

    Returns
    -------
    bool
        True if inputs are compatible.
    """
    if not (isinstance(obs, (np.ndarray, xr.DataArray)) and isinstance(mod, (np.ndarray, xr.DataArray))):
        return False

    if hasattr(obs, "shape") and hasattr(mod, "shape"):
        try:
            np.broadcast_shapes(obs.shape, mod.shape)
        except ValueError:
            return False

    return True

ExampleMetrics

Standard implementations of extended metrics as plugins.

Source code in src/monet_stats/plugin_system.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
class ExampleMetrics:
    """
    Standard implementations of extended metrics as plugins.
    """

    @staticmethod
    def wmape_plugin() -> CustomMetric:
        """
        Create a Weighted Mean Absolute Percentage Error (WMAPE) plugin.

        WMAPE = (sum(|mod - obs|) / sum(|obs|)) * 100

        Returns
        -------
        CustomMetric
            A configured WMAPE plugin.
        """

        def wmape_func(
            obs: Union[np.ndarray, xr.DataArray],
            mod: Union[np.ndarray, xr.DataArray],
            axis: Optional[Union[int, str]] = None,
        ) -> Union[float, np.ndarray, xr.DataArray]:
            # Xarray/Dask friendly implementation
            if isinstance(obs, xr.DataArray):
                numerator = (np.abs(mod - obs)).sum(dim=axis)
                denominator = (np.abs(obs)).sum(dim=axis)
            else:
                numerator = np.sum(np.abs(mod - obs), axis=axis)
                denominator = np.sum(np.abs(obs), axis=axis)
            return (numerator / denominator) * 100.0

        return CustomMetric(
            name="WMAPE",
            description="Weighted Mean Absolute Percentage Error",
            func=wmape_func,
        )

    @staticmethod
    def mape_bias_plugin() -> CustomMetric:
        """
        Create a MAPE Bias plugin.

        Calculates the difference between average positive and average negative percentage errors.

        Returns
        -------
        CustomMetric
            A configured MAPE Bias plugin.
        """

        def mape_bias_func(
            obs: Union[np.ndarray, xr.DataArray],
            mod: Union[np.ndarray, xr.DataArray],
            axis: Optional[Union[int, str]] = None,
        ) -> Union[float, np.ndarray, xr.DataArray]:
            pe = (mod - obs) / np.abs(obs)

            if isinstance(pe, xr.DataArray):
                pos_errors = pe.where(pe >= 0, 0).mean(dim=axis)
                neg_errors = pe.where(pe < 0, 0).abs().mean(dim=axis)
            else:
                pos_errors = np.mean(np.where(pe >= 0, pe, 0), axis=axis)
                neg_errors = np.mean(np.where(pe < 0, np.abs(pe), 0), axis=axis)

            return pos_errors - neg_errors

        return CustomMetric(
            name="MAPE_Bias",
            description="MAPE Bias - difference between positive and negative percentage errors",
            func=mape_bias_func,
        )

mape_bias_plugin() staticmethod

Create a MAPE Bias plugin.

Calculates the difference between average positive and average negative percentage errors.

Returns

CustomMetric A configured MAPE Bias plugin.

Source code in src/monet_stats/plugin_system.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@staticmethod
def mape_bias_plugin() -> CustomMetric:
    """
    Create a MAPE Bias plugin.

    Calculates the difference between average positive and average negative percentage errors.

    Returns
    -------
    CustomMetric
        A configured MAPE Bias plugin.
    """

    def mape_bias_func(
        obs: Union[np.ndarray, xr.DataArray],
        mod: Union[np.ndarray, xr.DataArray],
        axis: Optional[Union[int, str]] = None,
    ) -> Union[float, np.ndarray, xr.DataArray]:
        pe = (mod - obs) / np.abs(obs)

        if isinstance(pe, xr.DataArray):
            pos_errors = pe.where(pe >= 0, 0).mean(dim=axis)
            neg_errors = pe.where(pe < 0, 0).abs().mean(dim=axis)
        else:
            pos_errors = np.mean(np.where(pe >= 0, pe, 0), axis=axis)
            neg_errors = np.mean(np.where(pe < 0, np.abs(pe), 0), axis=axis)

        return pos_errors - neg_errors

    return CustomMetric(
        name="MAPE_Bias",
        description="MAPE Bias - difference between positive and negative percentage errors",
        func=mape_bias_func,
    )

wmape_plugin() staticmethod

Create a Weighted Mean Absolute Percentage Error (WMAPE) plugin.

WMAPE = (sum(|mod - obs|) / sum(|obs|)) * 100

Returns

CustomMetric A configured WMAPE plugin.

Source code in src/monet_stats/plugin_system.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
@staticmethod
def wmape_plugin() -> CustomMetric:
    """
    Create a Weighted Mean Absolute Percentage Error (WMAPE) plugin.

    WMAPE = (sum(|mod - obs|) / sum(|obs|)) * 100

    Returns
    -------
    CustomMetric
        A configured WMAPE plugin.
    """

    def wmape_func(
        obs: Union[np.ndarray, xr.DataArray],
        mod: Union[np.ndarray, xr.DataArray],
        axis: Optional[Union[int, str]] = None,
    ) -> Union[float, np.ndarray, xr.DataArray]:
        # Xarray/Dask friendly implementation
        if isinstance(obs, xr.DataArray):
            numerator = (np.abs(mod - obs)).sum(dim=axis)
            denominator = (np.abs(obs)).sum(dim=axis)
        else:
            numerator = np.sum(np.abs(mod - obs), axis=axis)
            denominator = np.sum(np.abs(obs), axis=axis)
        return (numerator / denominator) * 100.0

    return CustomMetric(
        name="WMAPE",
        description="Weighted Mean Absolute Percentage Error",
        func=wmape_func,
    )

PluginManager

Manager for registering and executing statistical metric plugins.

The PluginManager allows users to extend the library with custom metrics that follow the same interface as built-in metrics.

Source code in src/monet_stats/plugin_system.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class PluginManager:
    """
    Manager for registering and executing statistical metric plugins.

    The PluginManager allows users to extend the library with custom metrics
    that follow the same interface as built-in metrics.
    """

    def __init__(self) -> None:
        """Initialize the PluginManager with an empty plugin registry."""
        self._plugins: Dict[str, PluginInterface] = {}

    def register_plugin(self, plugin: PluginInterface) -> None:
        """
        Register a new statistical metric plugin.

        Parameters
        ----------
        plugin : PluginInterface
            The plugin instance to register.
        """
        self._plugins[plugin.name()] = plugin

    def unregister_plugin(self, name: str) -> None:
        """
        Unregister a plugin by name.

        Parameters
        ----------
        name : str
            Name of the plugin to remove from the registry.
        """
        if name in self._plugins:
            del self._plugins[name]

    def get_plugin(self, name: str) -> Optional[PluginInterface]:
        """
        Retrieve a registered plugin by its name.

        Parameters
        ----------
        name : str
            The name of the plugin to retrieve.

        Returns
        -------
        Optional[PluginInterface]
            The plugin instance if found, otherwise None.
        """
        return self._plugins.get(name)

    def list_plugins(self) -> List[str]:
        """
        List the names of all currently registered plugins.

        Returns
        -------
        List[str]
            A list of registered plugin names.
        """
        return list(self._plugins.keys())

    def compute_metric(
        self,
        name: str,
        obs: Union[np.ndarray, xr.DataArray],
        mod: Union[np.ndarray, xr.DataArray],
        **kwargs: Any,
    ) -> Union[float, np.ndarray, xr.DataArray]:
        """
        Execute a registered plugin to compute a metric.

        Parameters
        ----------
        name : str
            Name of the registered plugin to execute.
        obs : Union[np.ndarray, xr.DataArray]
            Observed values.
        mod : Union[np.ndarray, xr.DataArray]
            Model/predicted values.
        **kwargs : Any
            Additional keyword arguments for the metric computation.

        Returns
        -------
        Union[float, np.ndarray, xr.DataArray]
            The computed metric result.

        Raises
        ------
        ValueError
            If the plugin name is not registered or if input validation fails.
        """
        plugin = self.get_plugin(name)
        if plugin is None:
            raise ValueError(f"Plugin '{name}' not found in the registry.")

        if not plugin.validate_inputs(obs, mod, **kwargs):
            raise ValueError(f"Input validation failed for plugin '{name}'.")

        return plugin.compute(obs, mod, **kwargs)

__init__()

Initialize the PluginManager with an empty plugin registry.

Source code in src/monet_stats/plugin_system.py
25
26
27
def __init__(self) -> None:
    """Initialize the PluginManager with an empty plugin registry."""
    self._plugins: Dict[str, PluginInterface] = {}

compute_metric(name, obs, mod, **kwargs)

Execute a registered plugin to compute a metric.

Parameters

name : str Name of the registered plugin to execute. obs : Union[np.ndarray, xr.DataArray] Observed values. mod : Union[np.ndarray, xr.DataArray] Model/predicted values. **kwargs : Any Additional keyword arguments for the metric computation.

Returns

Union[float, np.ndarray, xr.DataArray] The computed metric result.

Raises

ValueError If the plugin name is not registered or if input validation fails.

Source code in src/monet_stats/plugin_system.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def compute_metric(
    self,
    name: str,
    obs: Union[np.ndarray, xr.DataArray],
    mod: Union[np.ndarray, xr.DataArray],
    **kwargs: Any,
) -> Union[float, np.ndarray, xr.DataArray]:
    """
    Execute a registered plugin to compute a metric.

    Parameters
    ----------
    name : str
        Name of the registered plugin to execute.
    obs : Union[np.ndarray, xr.DataArray]
        Observed values.
    mod : Union[np.ndarray, xr.DataArray]
        Model/predicted values.
    **kwargs : Any
        Additional keyword arguments for the metric computation.

    Returns
    -------
    Union[float, np.ndarray, xr.DataArray]
        The computed metric result.

    Raises
    ------
    ValueError
        If the plugin name is not registered or if input validation fails.
    """
    plugin = self.get_plugin(name)
    if plugin is None:
        raise ValueError(f"Plugin '{name}' not found in the registry.")

    if not plugin.validate_inputs(obs, mod, **kwargs):
        raise ValueError(f"Input validation failed for plugin '{name}'.")

    return plugin.compute(obs, mod, **kwargs)

get_plugin(name)

Retrieve a registered plugin by its name.

Parameters

name : str The name of the plugin to retrieve.

Returns

Optional[PluginInterface] The plugin instance if found, otherwise None.

Source code in src/monet_stats/plugin_system.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def get_plugin(self, name: str) -> Optional[PluginInterface]:
    """
    Retrieve a registered plugin by its name.

    Parameters
    ----------
    name : str
        The name of the plugin to retrieve.

    Returns
    -------
    Optional[PluginInterface]
        The plugin instance if found, otherwise None.
    """
    return self._plugins.get(name)

list_plugins()

List the names of all currently registered plugins.

Returns

List[str] A list of registered plugin names.

Source code in src/monet_stats/plugin_system.py
68
69
70
71
72
73
74
75
76
77
def list_plugins(self) -> List[str]:
    """
    List the names of all currently registered plugins.

    Returns
    -------
    List[str]
        A list of registered plugin names.
    """
    return list(self._plugins.keys())

register_plugin(plugin)

Register a new statistical metric plugin.

Parameters

plugin : PluginInterface The plugin instance to register.

Source code in src/monet_stats/plugin_system.py
29
30
31
32
33
34
35
36
37
38
def register_plugin(self, plugin: PluginInterface) -> None:
    """
    Register a new statistical metric plugin.

    Parameters
    ----------
    plugin : PluginInterface
        The plugin instance to register.
    """
    self._plugins[plugin.name()] = plugin

unregister_plugin(name)

Unregister a plugin by name.

Parameters

name : str Name of the plugin to remove from the registry.

Source code in src/monet_stats/plugin_system.py
40
41
42
43
44
45
46
47
48
49
50
def unregister_plugin(self, name: str) -> None:
    """
    Unregister a plugin by name.

    Parameters
    ----------
    name : str
        Name of the plugin to remove from the registry.
    """
    if name in self._plugins:
        del self._plugins[name]

register_builtin_plugins()

Register built-in example plugins to the global manager.

Source code in src/monet_stats/plugin_system.py
314
315
316
317
def register_builtin_plugins() -> None:
    """Register built-in example plugins to the global manager."""
    plugin_manager.register_plugin(ExampleMetrics.wmape_plugin())
    plugin_manager.register_plugin(ExampleMetrics.mape_bias_plugin())