"""Ridge regressors for Koopman/Transfer operator learning."""
# Authors: The kooplearn developers
# SPDX-License-Identifier: MIT
import logging
from numbers import Integral, Real
import numpy as np
import scipy
from sklearn.base import BaseEstimator
from sklearn.metrics import r2_score
from sklearn.utils._param_validation import Interval, StrOptions
from sklearn.utils.validation import check_is_fitted, validate_data
from kooplearn._linalg import covariance
from kooplearn._utils import fuzzy_parse_complex
from kooplearn.linear_model import _regressors
from kooplearn.structs import DynamicalModes
logger = logging.getLogger("kooplearn")
[docs]
class Ridge(BaseEstimator):
r"""Linear model minimizing the :math:`L^{2}` loss.
Implements a model approximating the Koopman (deterministic systems) or Transfer (stochastic
systems) operator by lifting the state with a *nonlinear* feature map and then minimizing the
:math:`L^{2}` loss in the embedded space as described in :cite:t:`ridge-Kostic2022`.
.. tip::
The dynamical modes obtained by calling
:class:`kooplearn.linear_model.Ridge.dynamical_modes` correspond to the *Extended Dynamical
Mode Decomposition* by :cite:t:`ridge-Williams2015_EDMD`.
Parameters
----------
n_components : int or None, default=None
Number of components to retain. If ``None``, all components are used.
lag_time : int, default=1
Time delay between the pairs of snapshots :math:`(X_t, X_{t + \text{lag_time}})` used to
train the estimator.
reduced_rank : bool, default=True
Whether to use reduced-rank regression introduced in
:cite:t:`ridge-Kostic2022`. If ``False``, initializes the classical
principal component estimator.
alpha : float or None, default=1e-6
Tikhonov (ridge) regularization coefficient. ``None`` is equivalent to
``alpha = 0``, and internally calls specialized stable
algorithms to deal with this specific case.
eigen_solver : {'auto', 'dense', 'arpack', 'randomized'}, default='auto'
Solver used to perform the internal SVD calculations. If ``n_components``
is much less than the number of training samples, ``randomized`` (or
``arpack`` to a smaller extent) may be more efficient than the ``dense`` solver.
auto :
the solver is selected automatically based on the number of samples and components:
if the number of components to extract is less than 10 (strict) and
the number of samples is more than 200 (strict), the ``arpack``
method is enabled. Otherwise the exact full eigenvalue
decomposition is computed and optionally truncated afterwards
(``dense`` method).
dense :
run exact full eigenvalue decomposition calling the standard
LAPACK solver via ``scipy.linalg.eigh``, and select the components
by postprocessing.
arpack :
run SVD truncated to ``n_components`` calling ARPACK solver using
``scipy.sparse.linalg.eigsh``. It requires strictly
``0 < n_components < n_samples``.
randomized :
run randomized SVD as described in :cite:t:`ridge-turri2023randomized`.
tol : float, default=0
Convergence tolerance for arpack.
If 0, optimal value is chosen automatically by arpack.
max_iter : int or None, default=None
Maximum number of iterations for arpack.
If ``None``, optimal value is chosen automatically by arpack.
iterated_power : {'auto'} or int, default='auto'
Number of iterations for the power method computed by
``eigen_solver == 'randomized'``. When ``'auto'``, it is set to 7 when
``n_components < 0.1 * min(X.shape)``, otherwise it is set to 4.
n_oversamples : int, default=5
Number of oversamples when using a randomized algorithm
(``eigen_solver == 'randomized'``).
random_state : int, RandomState instance or None, default=None
Used when ``eigen_solver`` is ``'arpack'`` or ``'randomized'``. Pass an int
for reproducible results across multiple function calls.
copy_X : bool, default=True
If ``True``, input X is copied and stored by the model in the ``X_fit_``
attribute. If no further changes will be done to X, setting
``copy_X=False`` saves memory by storing a reference.
Attributes
----------
X_fit_ : ndarray of shape (n_samples, n_features)
The data used to fit the model. If ``copy_X=False``, then ``X_fit_`` is
a reference to the original data.
y_fit_ : ndarray of shape (n_samples, ...) or None
The observable used to fit the model. If no observable is provided during fitting,
this attribute is ``None``.
cov_X_ : ndarray of shape (n_features, n_features)
Covariance matrix evaluated at the initial states, that is :math:`x_t`.
cov_Y_ : ndarray of shape (n_features, n_features)
Covariance matrix evaluated at the evolved states, that is :math:`x_{t+1}`.
cov_XY_ : ndarray of shape (n_features, n_features)
Cross-covariance matrix between initial and evolved states.
U_ : ndarray of shape (n_features, n_components)
Projection matrix. The evolution operator is approximated as
:math:`U U^T \mathrm{cov_{XY}}`.
estimator_ : ndarray of shape (n_features, n_features)
Least-squares estimator :math:`U U^T \mathrm{cov_{XY}}`.
rank_ : int
Effective rank of the fitted estimator.
Examples
--------
.. code-block:: python
>>> from kooplearn.datasets import make_linear_system
>>> from kooplearn.linear_model import Ridge
>>> import numpy as np
>>>
>>> # Generate a linear system
>>> A = np.array([[0.9, 0.1], [-0.1, 0.9]])
>>> X0 = np.array([1.0, 0.0])
>>> data = make_linear_system(X0, A, n_steps=100, noise=0.1, random_state=42).to_numpy()
>>>
>>> # Fit the model
>>> model = Ridge(n_components=2, alpha=1e-3)
>>> model = model.fit(data)
>>>
>>> # Predict the future state
>>> pred = model.predict(data)
>>>
>>> # Get the eigenvalues of the Koopman operator
>>> eigvals = model.eig()
"""
_parameter_constraints: dict = {
"n_components": [
Interval(Integral, 1, None, closed="left"),
None,
],
"lag_time": [
Interval(Integral, 1, None, closed="left"),
],
"reduced_rank": ["boolean"],
"alpha": [
[Interval(Real, 0, None, closed="left")],
None,
],
"eigen_solver": [StrOptions({"auto", "dense", "arpack", "randomized"})],
"tol": [Interval(Real, 0, None, closed="left")],
"max_iter": [
Interval(Integral, 1, None, closed="left"),
None,
],
"iterated_power": [
Interval(Integral, 0, None, closed="left"),
StrOptions({"auto"}),
],
"n_oversamples": [Interval(Integral, 1, None, closed="left")],
"random_state": ["random_state"],
"copy_X": ["boolean"],
}
def __init__(
self,
n_components=None,
*,
lag_time=1,
reduced_rank=True,
alpha=1e-6,
eigen_solver="auto",
tol=0,
max_iter=None,
iterated_power="auto",
n_oversamples=5,
random_state=None,
copy_X=True,
):
self.n_components = n_components
self.lag_time = lag_time
self.reduced_rank = reduced_rank
self.alpha = alpha
self.eigen_solver = eigen_solver
self.tol = tol
self.max_iter = max_iter
self.iterated_power = iterated_power
self.n_oversamples = n_oversamples
self.random_state = random_state
self.copy_X = copy_X
[docs]
def fit(self, X, y=None):
"""
Fit the linear model using the selected algorithm.
Depending on the model parameters, this method fits the estimator using
either a randomized or non-randomized algorithm, and either a full-rank
or reduced-rank regression approach.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Training trajectory data.
y : ndarray of shape (n_samples, n_features_out), default=None
Optional observable used for training.
If ``None``, the observable is assumed to be the state itself.
Returns
-------
self : object
Fitted model instance.
"""
self._pre_fit_checks(X)
if y is not None:
if y.shape[0] == X.shape[0]:
self.y_fit_ = y
else:
raise ValueError(
f"y has {y.shape[0]} samples, but X has {X.shape[0]}. "
"Both must have the same number of samples."
)
else:
self.y_fit_ = None
# Adjust number of components
if self.n_components is None:
n_components = self.cov_X_.shape[0]
else:
n_components = min(self.cov_X_.shape[0], self.n_components)
# Choose eigen solver
if self.eigen_solver == "auto":
if self.cov_X_.shape[0] > 200 and n_components < 10:
eigen_solver = "arpack"
else:
eigen_solver = "dense"
else:
eigen_solver = self.eigen_solver
# Adjust regularization parameter
if self.alpha is None:
alpha = 0.0
else:
alpha = self.alpha
# Set iterated power
if self.iterated_power == "auto":
iterated_power = 7 if n_components < 0.1 * min(X.shape) else 4
else:
iterated_power = self.iterated_power
if self.reduced_rank:
if eigen_solver == "randomized":
if alpha == 0.0:
raise ValueError(
"Tikhonov regularization must be specified when "
"solver is randomized."
)
fit_result = _regressors.rand_reduced_rank(
self.cov_X_,
self.cov_XY_,
alpha,
n_components,
self.n_oversamples,
iterated_power,
self.random_state,
)
else:
fit_result = _regressors.reduced_rank(
self.cov_X_,
self.cov_XY_,
alpha,
n_components,
eigen_solver,
self.tol,
self.max_iter,
self.random_state,
)
else:
if eigen_solver == "randomized":
fit_result = _regressors.rand_pcr(
self.cov_X_,
alpha,
n_components,
self.n_oversamples,
self.iterated_power,
self.random_state,
)
else:
fit_result = _regressors.pcr(
self.cov_X_,
alpha,
n_components,
eigen_solver,
self.tol,
self.max_iter,
self.random_state,
)
self._fit_result = fit_result
self.U_, _, self._spectral_biases = fit_result.values()
self.rank_ = self.U_.shape[1]
assert self.U_.shape[1] <= n_components
if self.U_.shape[1] < n_components:
logger.warning(
f"Warning: The fitting algorithm discarded {n_components - self.U_.shape[1]} " \
"dimensions of the {n_components} requested out of numerical instabilities." \
"\nThe rank attribute has been updated to {self.U_.shape[1]}.\nConsider " \
"decreasing the rank parameter."
)
n_components = self.U_.shape[1]
self.estimator_ = self.U_ @ self.U_.T @ self.cov_XY_
logger.info(f"Fitted {self.__class__.__name__} model.")
return self
[docs]
def predict(self, X, n_steps=1, observable=False) -> np.ndarray:
r"""
Predict the system state or its expected value after ``n_steps``.
Computes the predicted state — or, in the case of a stochastic system,
the expected value :math:`\mathbb{E}[X_t \mid X_0 = X]` — after
``t = n_steps`` time steps given the initial conditions ``X``.
If ``observable=True``, returns the corresponding predicted observable
instead of the state.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Initial conditions used for prediction.
n_steps : int, default=1
Number of future time steps to predict. Only the final predicted state
(at time ``t = n_steps``) is returned.
observable : bool, default=False
If ``True``, returns the predicted observable at time :math:`t`
instead of the system state.
Returns
-------
ndarray of shape (n_samples, n_features)
Predicted (expected) state or observable at time :math:`t = n_steps`.
"""
check_is_fitted(self)
X = validate_data(self, X, reset=False, copy=self.copy_X)
X_fit, X_lagged_fit = self._split_trajectory(self.X_fit_)
if observable:
if self.y_fit_ is not None:
_, observable_fit = self._split_trajectory(self.y_fit_)
else:
raise ValueError(
"Observable should be passed when calling fit as the y parameter."
)
else:
observable_fit = X_lagged_fit
pred = _regressors.predict(
n_steps,
self._fit_result,
self.cov_XY_,
X,
X_fit,
observable_fit,
)
return pred
[docs]
def risk(self, X=None):
"""Compute the estimator risk.
Parameters
----------
X : ndarray or None, default=None
Trajectory used to evaluate the risk. If None, evaluates on
training data.
Returns
-------
float
Risk of the estimator.
"""
check_is_fitted(self)
if X is not None:
X = validate_data(self, X, reset=False, copy=self.copy_X)
if X.shape[0] < 1 + self.lag_time:
raise ValueError(
f"X has only {X.shape[0]} samples, but at least"
f"{1 + self.lag_time} are required."
)
X_val, Y_val = self._split_trajectory(X)
cov_Xv, cov_Yv, cov_XYv = self._init_covs(X_val, Y_val)
else:
cov_Xv, cov_Yv, cov_XYv = self.cov_X_, self.cov_Y_, self.cov_XY_
return _regressors.estimator_risk(
self._fit_result, cov_Xv, cov_Yv, cov_XYv, self.cov_XY_
)
[docs]
def eig(self, eval_left_on=None, eval_right_on=None):
"""
Compute the eigendecomposition of the learned evolution operator.
This method returns the eigenvalues of the estimated evolution
operator, and optionally evaluates the corresponding left and/or right
eigenfunctions on user-provided data.
Parameters
----------
eval_left_on : ndarray of shape (n_samples, n_features), optional
Data points on which to evaluate the **left** eigenfunctions.
If ``None``, left eigenfunctions are not evaluated.
eval_right_on : ndarray of shape (n_samples, n_features), optional
Data points on which to evaluate the **right** eigenfunctions.
If ``None``, right eigenfunctions are not evaluated.
Returns
-------
eigenvalues : ndarray of shape (n_components,)
Eigenvalues of the estimated operator.
left_eigenfunctions : ndarray of shape (n_samples, n_components), optional
Values of the left eigenfunctions evaluated on ``eval_left_on``.
Returned only if ``eval_left_on`` is provided.
right_eigenfunctions : ndarray of shape (n_samples, n_components), optional
Values of the right eigenfunctions evaluated on ``eval_right_on``.
Returned only if ``eval_right_on`` is provided.
"""
check_is_fitted(self)
eig_result = _regressors.eig(self._fit_result, self.cov_XY_)
if eval_left_on is None and eval_right_on is None:
# (eigenvalues,)
return eig_result["values"]
elif eval_left_on is None and eval_right_on is not None:
# (eigenvalues, right eigenfunctions)
return eig_result["values"], _regressors.evaluate_eigenfunction(
eig_result, "right", eval_right_on
)
elif eval_left_on is not None and eval_right_on is None:
# (eigenvalues, left eigenfunctions)
return eig_result["values"], _regressors.evaluate_eigenfunction(
eig_result, "left", eval_left_on
)
elif eval_left_on is not None and eval_right_on is not None:
# (eigenvalues, left eigenfunctions, right eigenfunctions)
return (
eig_result["values"],
_regressors.evaluate_eigenfunction(eig_result, "left", eval_left_on),
_regressors.evaluate_eigenfunction(eig_result, "right", eval_right_on),
)
[docs]
def dynamical_modes(self, X, observable=False) -> DynamicalModes:
"""
Compute the mode decomposition of arbitrary observables of the
evolution operator at the states defined by ``X``. If
:math:`(\\lambda_i, \\xi_i, \\psi_i)_{i = 1}^{r}` are eigentriplets of the evolution
operator, for any observable :math:`f` the i-th mode of :math:`f` at :math:`x` is defined
as: :math:`\\lambda_i \\langle \\xi_i, f \\rangle \\psi_i(x)`.
See :cite:t:`ridge-Kostic2022` for more details.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
States at which to evaluate the modes.
observable : bool, default=False
If ``True``, computes the modes of the observable rather than those
of the system state.
Returns
-------
DynamicalModes
Object containing the eigenvalues, modes, and their projections.
See :class:`kooplearn.structs.DynamicalModes` for details.
"""
check_is_fitted(self)
X = validate_data(self, X, reset=False, copy=self.copy_X)
if X.shape[0] < 1 + self.lag_time:
raise ValueError(
f"X has only {X.shape[0]} samples, but at least "
f"{1 + self.lag_time} are required."
)
X_fit, Y_fit = self._split_trajectory(self.X_fit_)
# Compute eigendecomposition
U = self._fit_result["U"]
M = np.linalg.multi_dot([U.T, self.cov_XY_, U])
values, lv, rv = scipy.linalg.eig(M, left=True, right=True)
values = fuzzy_parse_complex(values)
r_perm = np.argsort(values)
l_perm = np.argsort(values.conj())
values = values[r_perm]
# Normalization in RKHS norm
rv = (U @ rv)[:, r_perm]
rv /= np.linalg.norm(rv, axis=0)
# Biorthogonalization
lv_full = np.linalg.multi_dot([self.cov_XY_.T, U, lv])
lv_full = lv_full[:, l_perm]
lv = lv[:, l_perm]
# Compute correct orthogonalization for the left projection
l_norm = np.sum(lv_full * rv, axis=0)
lv = lv / l_norm
# Initial conditions
right_eigenfunctions = (X @ rv) / X_fit.shape[0] # [num_init_conditions, rank]
if observable:
if self.y_fit_ is not None:
_, observable_fit = self._split_trajectory(self.y_fit_)
else:
raise ValueError(
"Observable should be passed when calling fit as the y parameter."
)
else:
observable_fit = Y_fit
left_projections = (
np.linalg.multi_dot([X_fit / X_fit.shape[0], U, lv]).T @ observable_fit
)
dmd = DynamicalModes(values, right_eigenfunctions, left_projections)
return dmd
[docs]
def score(
self, X=None, y=None, n_steps=1, observable=False, metric=r2_score, **metric_kws
) -> float:
"""
Score the model predictions for timestep ``n_steps``.
Computes the ``metric`` (default is ``sklearn.metrics.r2_score``) evaluated between
the model predictions at timestep ``n_steps`` and the true system state (or observable,
if ``observable=True``) at the same timestep.
Parameters:
X : ndarray of shape (n_samples, n_features) or None, default=None
Trajectory used to compute the score. If None, evaluates on
training data.
y : ndarray of shape (n_samples, n_features) or None, default=None
Optional observable used to compute the score.
n_steps : int, default=1
Number of future time steps on which to compute the score. Only the
predictions at the final timestep (``t = n_steps``) are compared
to the true system state (or observable).
observable : bool, default=False
If ``True``, returns the predicted observable at time :math:`t`
instead of the system state.
metric: callable (default=r2_score)
The metric function used to score the model predictions.
metric_kws: dict
Optional parameters to pass to the metric function.
Returns:
score: float
Metric function value for the model predictions at the next timestep.
"""
check_is_fitted(self)
# Case 1: Using training data
if X is None:
X_test, Y_test = self._split_trajectory(self.X_fit_)
if observable:
if self.y_fit_ is not None:
_, target = self._split_trajectory(self.y_fit_)
else:
raise ValueError(
"Cannot score on observable: no training observable was provided during" \
"fit."
)
else:
target = Y_test
# Case 2: Using provided test data
else:
X = validate_data(self, X, reset=False, copy=self.copy_X)
if X.shape[0] < 1 + self.lag_time:
raise ValueError(
f"X has only {X.shape[0]} samples, but at least "
f"{1 + self.lag_time} are required."
)
X_test, Y_test = self._split_trajectory(X)
if observable:
if y is None:
raise ValueError(
"When observable=True and X is provided, y must contain the corresponding" \
"observable values."
)
if y.shape[0] != X.shape[0]:
raise ValueError(
f"y has {y.shape[0]} samples, but X has {X.shape[0]}. "
"Both must have the same number of samples."
)
_, target = self._split_trajectory(y)
else:
target = Y_test
# Make predictions and align timestamps
pred = self.predict(X_test, n_steps=n_steps, observable=observable)
if n_steps > 1:
target = target[n_steps - 1 :]
pred = pred[: -(n_steps - 1)]
return metric(target, pred, **metric_kws)
def _svals(self):
"""Singular values of the Koopman/Transfer operator.
Returns:
The estimated singular values of the Koopman/Transfer operator. Array of shape
`(n_components,)`.
"""
check_is_fitted(self)
return _regressors.svdvals(self._fit_result, self.cov_XY_)
def _init_covs(self, X, Y):
"""
Initializes the covariance matrices `cov_X`, `cov_Y`, and `cov_XY`.
Args:
X (np.ndarray):
Feature map evaluated at the initial training steps, shape
``(n_samples, features_shape)``.
Y (np.ndarray):
Feature map evaluated a the evolved training steps, shape
``(n_samples, features_shape)``.
Returns:
A tuple containing:
- ``cov_X`` (np.ndarray): Covariance matrix of the feature map evaluated at X,
shape ``(n_features, n_features)``.
- ``cov_Y`` (np.ndarray): Covariance matrix of the feature map evaluated at Y,
shape ``(n_features, n_features)``.
- ``cov_XY`` (np.ndarray): Cross-covariance matrix of the feature map evaluated at X
and Y, shape ``(n_features, n_features)``.
"""
cov_X = covariance(X)
cov_Y = covariance(Y)
cov_XY = covariance(X, Y)
return cov_X, cov_Y, cov_XY
def _pre_fit_checks(self, X):
"""Performs pre-fit checks on the training data.
Initialize the covariance matrices and saves the training data.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Training trajectory data, where `n_samples` is the number of
samples and `n_features` is the number of features.
"""
X = validate_data(self, X, copy=self.copy_X)
if X.shape[0] < 1 + self.lag_time:
raise ValueError(
f"X has only {X.shape[0]} samples, but at least "
f"{1 + self.lag_time} are required."
)
self.X_fit_ = X
X_fit, Y_fit = self._split_trajectory(X)
self.cov_X_, self.cov_Y_, self.cov_XY_ = self._init_covs(X_fit, Y_fit)
def _split_trajectory(self, X):
"""Split a trajectory into context and target pairs."""
return X[: -self.lag_time], X[self.lag_time :]
def __sklearn_tags__(self):
tags = super().__sklearn_tags__()
tags.target_tags.required = False
tags.requires_fit = True
tags.non_deterministic = (
self.eigen_solver == "randomized" or self.eigen_solver == "arpack"
)
return tags