"""Kernel methods for Koopman/Transfer operator learning."""
# Authors: The kooplearn developers
# SPDX-License-Identifier: MIT
import logging
from numbers import Integral, Real
from numpy import ndarray
from sklearn.base import BaseEstimator
from sklearn.metrics import r2_score
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.utils._param_validation import Interval, StrOptions
from sklearn.utils.validation import check_is_fitted, validate_data
from kooplearn.kernel import _regressors
from kooplearn.structs import DynamicalModes
logger = logging.getLogger("kooplearn")
[docs]
class KernelRidge(BaseEstimator):
r"""
Kernel model minimizing the :math:`L^{2}` loss.
Implements a model approximating the Koopman (deterministic systems) or
Transfer (stochastic systems) operator by lifting the state with a
*infinite-dimensional nonlinear* feature map associated to a kernel
:math:`k` and then minimizing the :math:`L^{2}` loss in the embedded space
as described in :cite:t:`kernelridge-Kostic2022`.
.. tip::
The dynamical modes obtained by calling
:class:`kooplearn.kernel.KernelRidge.dynamical_modes` correspond to the *Kernel
Dynamical Mode Decomposition* by :cite:t:`kernelridge-Williams2015_KDMD`.
Parameters
----------
n_components : int or None, default=None
Number of components to retain. If ``None``, all components are used.
lag_time : int, default=1
Time delay between the pairs of snapshots
:math:`(X_t, X_{t + \text{lag_time}})` used to train the estimator.
reduced_rank : bool, default=True
Whether to use reduced-rank regression introduced in
:cite:t:`Kostic2022`. If ``False``, initializes the classical
principal component estimator.
kernel : {'linear', 'poly', 'rbf', 'sigmoid', 'cosine'} \
or callable, default='linear'
Kernel function to use, or a callable that returns a Gram matrix.
gamma : float or None, default=None
Kernel coefficient for ``rbf``, ``poly``, and ``sigmoid`` kernels.
Ignored by other kernels. If ``None``, it is set to ``1 / n_features``.
degree : float, default=3
Degree for polynomial kernels. Ignored by other kernels.
coef0 : float, default=1
Independent term in polynomial and sigmoid kernels.
Ignored by other kernels.
kernel_params : dict or None, default=None
Parameters (keyword arguments) passed to a callable kernel.
Ignored by other kernels.
alpha : float, default=1e-6
Tikhonov (ridge) regularization coefficient. ``None`` is equivalent to
``alpha = 0``, and internally calls specialized stable
algorithms to deal with this specific case.
eigen_solver : {'auto', 'dense', 'arpack', 'randomized'}, default='auto'
Solver used to perform the internal SVD calculations. If ``n_components``
is much less than the number of training samples, ``randomized`` (or
``arpack`` to a smaller extent) may be more efficient than the ``dense`` solver.
auto :
the solver is selected automatically based on the number of samples and components:
if the number of components to extract is less than 10 (strict) and
the number of samples is more than 200 (strict), the ``arpack``
method is enabled. Otherwise the exact full eigenvalue
decomposition is computed and optionally truncated afterwards
(``dense`` method).
dense :
run exact full eigenvalue decomposition calling the standard
LAPACK solver via ``scipy.linalg.eigh``, and select the components
by postprocessing.
arpack :
run SVD truncated to ``n_components`` calling ARPACK solver using
``scipy.sparse.linalg.eigsh``. It requires strictly
``0 < n_components < n_samples``.
randomized :
run randomized SVD as described in :cite:t:`kernelridge-turri2023randomized`.
tol : float, default=0
Convergence tolerance for arpack.
If 0, optimal value is chosen automatically by arpack.
max_iter : int or None, default=None
Maximum number of iterations for arpack.
If ``None``, optimal value is chosen automatically by arpack.
iterated_power : {'auto'} or int, default='auto'
Number of iterations for the power method computed by
``eigen_solver == 'randomized'``. When ``'auto'``, it is set to 7 when
``n_components < 0.1 * min(X.shape)``, otherwise it is set to 4.
n_oversamples : int, default=5
Number of oversamples when using a randomized algorithm
(``eigen_solver == 'randomized'``).
optimal_sketching : bool, default=False
Sketching strategy for the randomized solver. If `True` performs
optimal sketching (computationally expensive but more accurate).
random_state : int, RandomState instance or None, default=None
Used when ``eigen_solver`` is ``'arpack'`` or ``'randomized'``. Pass an int
for reproducible results across multiple function calls.
copy_X : bool, default=True
If ``True``, input X is copied and stored by the model in the ``X_fit_``
attribute. If no further changes will be done to X, setting
``copy_X=False`` saves memory by storing a reference.
n_jobs : int or None, default=None
Number of parallel jobs to run.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` uses all available processors.
Attributes
----------
X_fit_ : ndarray of shape (n_samples, n_features)
The data used to fit the model. If ``copy_X=False``, then ``X_fit_`` is
a reference to the original data.
y_fit_ : ndarray of shape (n_samples, ...) or None
The observable used to fit the model. If no observable is provided during fitting,
this attribute is ``None``.
gamma_ : float
Effective kernel coefficient for RBF, polynomial, and sigmoid kernels.
When ``gamma`` is explicitly provided, this is the same as ``gamma``.
When ``gamma`` is ``None``, this is the inferred value.
kernel_X_ : ndarray of shape (n_samples, n_samples)
Kernel matrix evaluated at the initial states.
kernel_Y_ : ndarray of shape (n_samples, n_samples)
Kernel matrix evaluated at the evolved states.
kernel_YX_ : ndarray of shape (n_samples, n_samples)
Cross-kernel matrix between evolved and initial states.
U_ : ndarray of shape (n_samples, rank)
Left projection matrix of the operator approximation:
:math:`k(\cdot, X) U V^\top k(\cdot, Y)`
(see :cite:t:`kernelridge-Kostic2022`).
V_ : ndarray of shape (n_samples, rank)
Right projection matrix of the operator approximation:
:math:`k(\cdot, X) U V^\top k(\cdot, Y)`
(see :cite:t:`kernelridge-Kostic2022`).
rank_ : int
Effective rank of the fitted estimator.
Examples
--------
.. code-block:: python
>>> from kooplearn.datasets import make_linear_system
>>> from kooplearn.kernel import KernelRidge
>>> import numpy as np
>>>
>>> # Generate a linear system
>>> A = np.array([[0.9, 0.1], [-0.1, 0.9]])
>>> X0 = np.array([1.0, 0.0])
>>> data = make_linear_system(X0, A, n_steps=100, noise=0.1, random_state=42).to_numpy()
>>>
>>> # Fit the model
>>> model = KernelRidge(n_components=2, kernel='linear', alpha=1e-3)
>>> model = model.fit(data)
>>> # Predict the future state
>>> pred = model.predict(data)
>>>
>>> # Get the eigenvalues of the Koopman operator
>>> eigvals = model.eig()
"""
_parameter_constraints: dict = {
"n_components": [
Interval(Integral, 1, None, closed="left"),
None,
],
"lag_time": [
Interval(Integral, 1, None, closed="left"),
],
"reduced_rank": ["boolean"],
"kernel": [
StrOptions({"linear", "poly", "rbf", "sigmoid", "cosine"}),
callable,
],
"gamma": [
Interval(Real, 0, None, closed="left"),
None,
],
"degree": [Interval(Real, 0, None, closed="left")],
"coef0": [Interval(Real, None, None, closed="neither")],
"kernel_params": [dict, None],
"alpha": [
[Interval(Real, 0, None, closed="left")],
None,
],
"eigen_solver": [StrOptions({"auto", "dense", "arpack", "randomized"})],
"tol": [Interval(Real, 0, None, closed="left")],
"max_iter": [
Interval(Integral, 1, None, closed="left"),
None,
],
"iterated_power": [
Interval(Integral, 0, None, closed="left"),
StrOptions({"auto"}),
],
"n_oversamples": [Interval(Integral, 1, None, closed="left")],
"optimal_sketching": ["boolean"],
"random_state": ["random_state"],
"copy_X": ["boolean"],
"n_jobs": [None, Integral],
}
def __init__(
self,
n_components=None,
*,
lag_time=1,
reduced_rank=True,
kernel="linear",
gamma=None,
degree=3,
coef0=1,
kernel_params=None,
alpha=1e-6,
eigen_solver="auto",
tol=0,
max_iter=None,
iterated_power="auto",
n_oversamples=5,
optimal_sketching=False,
random_state=None,
copy_X=True,
n_jobs=None,
):
self.n_components = n_components
self.lag_time = lag_time
self.reduced_rank = reduced_rank
self.kernel = kernel
self.kernel_params = kernel_params
self.gamma = gamma
self.degree = degree
self.coef0 = coef0
self.alpha = alpha
self.eigen_solver = eigen_solver
self.tol = tol
self.max_iter = max_iter
self.iterated_power = iterated_power
self.n_oversamples = n_oversamples
self.optimal_sketching = optimal_sketching
self.random_state = random_state
self.n_jobs = n_jobs
self.copy_X = copy_X
[docs]
def fit(self, X, y=None):
"""
Fit the kernel model to data.
Depending on the model parameters, this method fits the estimator using
either a randomized or non-randomized algorithm, and either a full-rank
or reduced-rank regression approach.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Training trajectory data.
y : ndarray of shape (n_samples, n_features_out), default=None
Optional observable used for training.
If ``None``, the observable is assumed to be the state itself.
Returns
-------
self : object
Fitted model instance.
"""
self._pre_fit_checks(X)
if y is not None:
if y.shape[0] == X.shape[0]:
self.y_fit_ = y
else:
raise ValueError(
f"y has {y.shape[0]} samples, but X has {X.shape[0]}. "
"Both must have the same number of samples."
)
else:
self.y_fit_ = None
# Adjust number of components
if self.n_components is None:
n_components = self.kernel_X_.shape[0]
else:
n_components = min(self.kernel_X_.shape[0], self.n_components)
# Choose eigen solver
if self.eigen_solver == "auto":
if self.kernel_X_.shape[0] > 200 and n_components < 10:
eigen_solver = "arpack"
else:
eigen_solver = "dense"
else:
eigen_solver = self.eigen_solver
# Adjust regularization parameter
if self.alpha is None:
alpha = 0.0
else:
alpha = self.alpha
# Set iterated power
if self.iterated_power == "auto":
iterated_power = 7 if n_components < 0.1 * min(X.shape) else 4
else:
iterated_power = self.iterated_power
# Compute regression
if self.reduced_rank:
if eigen_solver == "randomized":
if alpha == 0.0:
raise ValueError(
"Tikhonov regularization must be specified when "
"solver is randomized."
)
fit_result = _regressors.rand_reduced_rank(
self.kernel_X_,
self.kernel_Y_,
alpha,
n_components,
self.n_oversamples,
self.optimal_sketching,
iterated_power,
random_state=self.random_state,
)
else:
fit_result = _regressors.reduced_rank(
self.kernel_X_,
self.kernel_Y_,
alpha,
n_components,
eigen_solver,
self.tol,
self.max_iter,
self.random_state,
)
else:
if eigen_solver == "randomized":
fit_result = _regressors.rand_pcr(
self.kernel_X_,
alpha,
n_components,
self.n_oversamples,
iterated_power,
random_state=self.random_state,
)
else:
fit_result = _regressors.pcr(
self.kernel_X_,
alpha,
n_components,
eigen_solver,
self.tol,
self.max_iter,
self.random_state,
)
self._fit_result = fit_result
self.U_, self.V_, self._spectral_biases = fit_result.values()
self.rank_ = self.U_.shape[1]
logger.info(f"Fitted {self.__class__.__name__} model.")
return self
[docs]
def predict(self, X, n_steps=1, observable=False) -> ndarray:
r"""
Predict the system state or its expected value after ``n_steps``.
Computes the predicted state — or, in the case of a stochastic system,
the expected value :math:`\mathbb{E}[X_t \mid X_0 = X]` — after
``t = n_steps`` time steps given the initial conditions ``X``.
If ``observable=True``, returns the corresponding predicted observable
instead of the state.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Initial conditions used for prediction.
n_steps : int, default=1
Number of future time steps to predict. Only the final predicted state
(at time ``t = n_steps``) is returned.
observable : bool, default=False
If ``True``, returns the predicted observable at time :math:`t`
instead of the system state.
Returns
-------
ndarray of shape (n_samples, n_features)
Predicted (expected) state or observable at time :math:`t = n_steps`.
"""
check_is_fitted(self)
X = validate_data(self, X, reset=False, copy=self.copy_X)
X_fit, _ = self._split_trajectory(self.X_fit_)
K_Xin_X = self._get_kernel(X, X_fit)
if observable:
if self.y_fit_ is not None:
observable_fit, _ = self._split_trajectory(self.y_fit_)
else:
raise ValueError(
"Observable should be passed when calling fit as the y parameter."
)
else:
observable_fit = X_fit
pred = _regressors.predict(
n_steps,
self._fit_result,
self.kernel_YX_,
K_Xin_X,
observable_fit,
)
return pred
[docs]
def risk(self, X=None):
"""Compute the estimator risk.
Parameters
----------
X : ndarray or None, default=None
Trajectory used to evaluate the risk. If None, evaluates on
training data.
Returns
-------
float
Risk of the estimator.
"""
check_is_fitted(self)
if X is not None:
X = validate_data(self, X, reset=False, copy=self.copy_X)
if X.shape[0] < 1 + self.lag_time:
raise ValueError(
f"X has only {X.shape[0]} samples, but at least"
f"{1 + self.lag_time} are required."
)
X_val, Y_val = self._split_trajectory(X)
X_train, Y_train = self._split_trajectory(self.X_fit_)
kernel_Yv = self._get_kernel(Y_val)
kernel_XXv = self._get_kernel(X_train, X_val)
kernel_YYv = self._get_kernel(Y_train, Y_val)
else:
kernel_Yv = self.kernel_Y_
kernel_XXv = self.kernel_X_
kernel_YYv = self.kernel_Y_
return _regressors.estimator_risk(
self._fit_result,
kernel_Yv,
self.kernel_Y_,
kernel_XXv,
kernel_YYv,
)
[docs]
def eig(self, eval_left_on=None, eval_right_on=None):
"""
Compute the eigendecomposition of the learned evolution operator.
This method returns the eigenvalues of the estimated evolution
operator, and optionally evaluates the corresponding left and/or right
eigenfunctions on user-provided data.
Parameters
----------
eval_left_on : ndarray of shape (n_samples, n_features), optional
Data points on which to evaluate the **left** eigenfunctions.
If ``None``, left eigenfunctions are not evaluated.
eval_right_on : ndarray of shape (n_samples, n_features), optional
Data points on which to evaluate the **right** eigenfunctions.
If ``None``, right eigenfunctions are not evaluated.
Returns
-------
eigenvalues : ndarray of shape (n_components,)
Eigenvalues of the estimated operator.
left_eigenfunctions : ndarray of shape (n_samples, n_components), optional
Values of the left eigenfunctions evaluated on ``eval_left_on``.
Returned only if ``eval_left_on`` is provided.
right_eigenfunctions : ndarray of shape (n_samples, n_components), optional
Values of the right eigenfunctions evaluated on ``eval_right_on``.
Returned only if ``eval_right_on`` is provided.
"""
check_is_fitted(self)
eig_result = _regressors.eig(self._fit_result, self.kernel_X_, self.kernel_YX_)
X_fit, Y_fit = self._split_trajectory(self.X_fit_)
if eval_left_on is None and eval_right_on is None:
# (eigenvalues,)
return eig_result["values"]
elif eval_left_on is None and eval_right_on is not None:
# (eigenvalues, right eigenfunctions)
kernel_Xin_X_or_Y = self._get_kernel(eval_right_on, X_fit)
return eig_result["values"], _regressors.evaluate_eigenfunction(
eig_result, "right", kernel_Xin_X_or_Y
)
elif eval_left_on is not None and eval_right_on is None:
# (eigenvalues, left eigenfunctions)
kernel_Xin_X_or_Y = self._get_kernel(eval_left_on, Y_fit)
return eig_result["values"], _regressors.evaluate_eigenfunction(
eig_result, "left", kernel_Xin_X_or_Y
)
elif eval_left_on is not None and eval_right_on is not None:
# (eigenvalues, left eigenfunctions, right eigenfunctions)
kernel_Xin_X_or_Y_left = self._get_kernel(eval_left_on, Y_fit)
kernel_Xin_X_or_Y_right = self._get_kernel(eval_right_on, X_fit)
return (
eig_result["values"],
_regressors.evaluate_eigenfunction(
eig_result, "left", kernel_Xin_X_or_Y_left
),
_regressors.evaluate_eigenfunction(
eig_result, "right", kernel_Xin_X_or_Y_right
),
)
[docs]
def dynamical_modes(self, X, observable=False) -> DynamicalModes:
"""
Compute the mode decomposition of arbitrary observables of the
evolution operator at the states defined by ``X``.
If :math:`(\\lambda_i, \\xi_i, \\psi_i)_{i = 1}^{r}` are eigentriplets of the evolution
operator, for any observable :math:`f` the i-th mode of :math:`f` at :math:`x` is defined
as: :math:`\\lambda_i \\langle \\xi_i, f \\rangle \\psi_i(x)`.
See :cite:t:`kernelridge-Kostic2022` for more details.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
States at which to evaluate the modes.
observable : bool, default=False
If ``True``, computes the modes of the observable rather than those
of the system state.
Returns
-------
DynamicalModes
Object containing the eigenvalues, modes, and their projections.
See :class:`kooplearn.structs.DynamicalModes` for details.
"""
check_is_fitted(self)
X = validate_data(self, X, reset=False, copy=self.copy_X)
if X.shape[0] < 1 + self.lag_time:
raise ValueError(
f"X has only {X.shape[0]} samples, but at least "
f"{1 + self.lag_time} are required."
)
# Compute eigendecomposition
eig_result = _regressors.eig(self._fit_result, self.kernel_X_, self.kernel_YX_)
# Evaluate the right eigenfunctions on the input points X, see also self.eig
X_fit, Y_fit = self._split_trajectory(self.X_fit_)
K_Xin_X = self._get_kernel(X, X_fit)
right_eigenfunctions = _regressors.evaluate_eigenfunction(
eig_result, "right", K_Xin_X
) # [num_initial_conditions, rank]
# Project the observable onto the left eigenfunctions
if observable:
if self.y_fit_ is not None:
_, observable_fit = self._split_trajectory(self.y_fit_)
else:
raise ValueError(
"Observable should be passed when calling fit as the y parameter."
)
else:
observable_fit = Y_fit # [num_training_points, num_features]
left_projections = (
(eig_result["left"].T) @ observable_fit / (Y_fit.shape[0] ** 0.5)
)
dmd = DynamicalModes(
eig_result["values"], right_eigenfunctions, left_projections
)
return dmd
[docs]
def score(
self, X=None, y=None, n_steps=1, observable=False, metric=r2_score, **metric_kws
) -> float:
"""
Score the model predictions for timestep ``n_steps``.
Computes the ``metric`` (default is ``sklearn.metrics.r2_score``) evaluated between
the model predictions at timestep ``n_steps`` and the true system state (or observable,
if ``observable=True``) at the same timestep.
Parameters:
X : ndarray of shape (n_samples, n_features) or None, default=None
Trajectory used to compute the score. If None, evaluates on
training data.
y : ndarray of shape (n_samples, n_features) or None, default=None
Optional observable used to compute the score.
n_steps : int, default=1
Number of future time steps on which to compute the score. Only the
predictions at the final timestep (``t = n_steps``) are compared
to the true system state (or observable).
observable : bool, default=False
If ``True``, returns the predicted observable at time :math:`t`
instead of the system state.
metric: callable (default=r2_score)
The metric function used to score the model predictions.
metric_kws: dict
Optional parameters to pass to the metric function.
Returns:
score: float
Metric function value for the model predictions at the next timestep.
"""
check_is_fitted(self)
# Case 1: Using training data
if X is None:
X_test, Y_test = self._split_trajectory(self.X_fit_)
if observable:
if self.y_fit_ is not None:
_, target = self._split_trajectory(self.y_fit_)
else:
raise ValueError(
"Cannot score on observable: no training observable was provided during" \
" fit."
)
else:
target = Y_test
# Case 2: Using provided test data
else:
X = validate_data(self, X, reset=False, copy=self.copy_X)
if X.shape[0] < 1 + self.lag_time:
raise ValueError(
f"X has only {X.shape[0]} samples, but at least "
f"{1 + self.lag_time} are required."
)
X_test, Y_test = self._split_trajectory(X)
if observable:
if y is None:
raise ValueError(
"When observable=True and X is provided, y must contain the " \
"corresponding observable values."
)
if y.shape[0] != X.shape[0]:
raise ValueError(
f"y has {y.shape[0]} samples, but X has {X.shape[0]}. "
"Both must have the same number of samples."
)
_, target = self._split_trajectory(y)
else:
target = Y_test
# Make predictions and align timestamps
pred = self.predict(X_test, n_steps=n_steps, observable=observable)
if n_steps > 1:
target = target[n_steps - 1 :]
pred = pred[: -(n_steps - 1)]
return metric(target, pred, **metric_kws)
def _svals(self):
"""Singular values of the Koopman/Transfer operator.
Returns:
The estimated singular values of the Koopman/Transfer operator. Array of shape
`(n_components,)`.
"""
check_is_fitted(self)
return _regressors.svdvals(self._fit_result, self.kernel_X_, self.kernel_Y_)
def _get_kernel(self, X, Y=None):
"""Compute the pairwise kernel matrix."""
if callable(self.kernel):
params = self.kernel_params or {}
else:
params = {
"gamma": self.gamma_,
"degree": self.degree,
"coef0": self.coef0,
}
if Y is None:
Y = X
return pairwise_kernels(
X,
Y,
metric=self.kernel,
filter_params=True,
n_jobs=self.n_jobs,
**params,
)
def _init_kernels(self, X, Y):
"""Initialize kernel matrices for training."""
K_X = self._get_kernel(X)
K_Y = self._get_kernel(Y)
K_YX = self._get_kernel(Y, X)
return K_X, K_Y, K_YX
def _pre_fit_checks(self, X):
"""Perform pre-fit checks and initialize kernel matrices."""
X = validate_data(self, X, copy=self.copy_X)
if X.shape[0] < 1 + self.lag_time:
raise ValueError(
f"X has only {X.shape[0]} samples, but at least "
f"{1 + self.lag_time} are required."
)
self.gamma_ = 1 / X.shape[1] if self.gamma is None else self.gamma
self.X_fit_ = X
X_fit, Y_fit = self._split_trajectory(X)
self.kernel_X_, self.kernel_Y_, self.kernel_YX_ = self._init_kernels(
X_fit, Y_fit
)
def _split_trajectory(self, X):
"""Split a trajectory into context and target pairs."""
return X[: -self.lag_time], X[self.lag_time :]
def __sklearn_tags__(self):
tags = super().__sklearn_tags__()
tags.target_tags.required = False
tags.requires_fit = True
tags.non_deterministic = (
self.eigen_solver == "randomized" or self.eigen_solver == "arpack"
)
return tags