"""Kernel methods for Koopman/Transfer operator learning."""
# Authors: The kooplearn developers
# SPDX-License-Identifier: MIT
import logging
from numbers import Integral, Real
import numpy as np
from numpy import ndarray
from sklearn.base import BaseEstimator
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.utils._param_validation import Interval, StrOptions
from sklearn.utils.validation import check_is_fitted, validate_data
from kooplearn.kernel import _regressors, _utils
from kooplearn.structs import DynamicalModes
logger = logging.getLogger("kooplearn")
[docs]
class GeneratorDirichlet(BaseEstimator):
r"""
Kernel-based estimator for the infinitesimal generator of diffusion
processes using the Dirichlet-form method.
This class approximates the infinitesimal generator
:math:`\mathcal{L}` of a diffusion process by embedding data into a
reproducing kernel Hilbert space (RKHS). The generator is learned
from samples of the invariant distribution, without
explicit knowledge of the drift or diffusion coefficients.
The estimator is designed for data generated by a general diffusion
process of the form
.. math::
\mathrm{d} X_t
=
a(X_t)\, \mathrm{d} t
+
b(X_t)\, \mathrm{d} W_t,
where :math:`a : \mathbb{R}^d \to \mathbb{R}^d` is the drift field,
:math:`b : \mathbb{R}^d \to \mathbb{R}^{d \times m}` is the diffusion
coefficient, and :math:`W_t` denotes an :math:`m`-dimensional standard
Brownian motion.
Assuming sufficient smoothness, the corresponding infinitesimal
generator acts on smooth test functions as
.. math::
(\mathcal{L} f)(x)
=
\nabla f(x)^\top a(x)
+
\frac{1}{2}\,
\operatorname{Tr}\!\left[
b(x)^\top (\nabla^2 f(x))\, b(x)
\right],
where :math:`\nabla^2 f(x)` denotes the Hessian of :math:`f`.
The generator governs the evolution of observables through the
backward Kolmogorov equation
.. math::
\frac{\partial}{\partial t} u(x,t)
=
\mathcal{L} u(x,t),
\qquad
u(x,0) = f(x),
with :math:`u(x,t) = \mathbb{E}[f(X_t) \mid X_0 = x]`.
The estimator constructs a kernelized approximation of the resolvent
of the generator :math:`( \mu I - \mathcal{L})^{-1}`
using first- and second-order derivatives of the kernel and solves a
regularized variational problem to recover generator eigenvalues and
eigenfunctions.
Parameters
----------
diffusion : float or ndarray
Diffusion specification for the SDE. The input is interpreted as the
diffusion matrix directly (no multiplication to form :math:`D = b b^\top`).
The allowed formats are:
1. **Scalar diffusion**
If a float is provided, the diffusion is assumed isotropic, the same
for all dimensions.
2. **constant diffusion**
If an array of shape ``(n_features,)`` is provided, the diffusion is
taken to be constant.
3. **State dependant diffusion**
If an array of shape ``(n_samples, n_features)`` is provided, it is
interpreted as a state dependant diffusion.
n_components : int or None, optional
Number of generator eigenmodes to retain. If ``None``, all components
are kept.
gamma : float, optional
RBF kernel scale parameter. If ``None``, defaults to
``1 / n_features``.
alpha : float or None, default=1e-6
Tikhonov regularization parameter for the variational problem.
If ``None``, a specialized unregularized solver is used.
n_jobs : int, default=1
Number of parallel workers for kernel computation.
shift : float, default=1.0
Positive shift $\mu$ to define the resolvent operator.
Attributes
----------
X_fit_ : ndarray of shape (n_samples, n_features)
Training data used for fitting.
gamma_ : float
Effective kernel parameter.
kernel_X_ : ndarray of shape (n_samples, n_samples)
Kernel Gram matrix.
eigresults : dict
Result of the eigendecomposition step. Contains entries:
- ``"values"`` : ndarray of shape (r,)
Generator eigenvalues.
- ``"left"`` : ndarray of shape (n_samples, r)
Left eigenfunctions evaluated on the data.
- ``"right"`` : ndarray
Right eigenfunctions represented in RKHS coordinates.
rank_ : int
Number of retained eigenmodes.
Notes
-----
This implementation follows :cite:t:`kostic2024learning`
and provides a kernel-based estimator of the infinitesimal generator
from equilibrium samples of the overdamped Langevin dynamics.
.. attention::
Currently, only the RBF kernel is supported.
Examples
--------
>>> import numpy as np
>>> from kooplearn.kernel import GeneratorDirichlet
>>> from kooplearn.datasets import make_prinz_potential
>>> X = make_prinz_potential(X0=0, n_steps=500, gamma=1.0, sigma=2.0)
>>> model = GeneratorDirichlet(
... diffusion=1.0,
... n_components=4,
... gamma=1.0,
... )
>>> model = model.fit(X)
>>> eigvals = model.eig()
>>> f_pred = model.predict(X, t=1.0)
"""
_parameter_constraints: dict = {
"n_components": [
Interval(Integral, 1, None, closed="left"),
None,
],
"kernel": [StrOptions({"rbf"})],
"gamma": [
Interval(Real, 0, None, closed="left"),
None,
],
"alpha": [
[Interval(Real, 0, None, closed="left")],
None,
],
}
def __init__(
self,
diffusion,
n_components=None,
*,
gamma=None,
alpha=1e-6,
n_jobs=1,
shift=1.0,
):
self.n_components = n_components
self.kernel = "rbf"
self.gamma = gamma
self.alpha = alpha
self.n_jobs = n_jobs
self.diffusion = diffusion
self.shift = shift
[docs]
def fit(self, X, y=None):
"""
Fit the Dirichlet-form kernel model to trajectory data.
This computes:
- The Gram matrix ``K``,
- Its first and second kernel derivatives,
- An approximation of the generator via reduced-rank regression
:cite:t:`kostic2024learning`,
- Its eigenvalues and eigenfunctions.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Training states sampled from a diffusion process.
y : ndarray of shape (n_samples, n_features_out), default=None
Optional observable used for training.
If ``None``, the observable is assumed to be the state itself.
Returns
-------
self : GeneratorDirichlet
Fitted estimator.
"""
self._pre_fit_checks(X, self.diffusion)
if y is not None:
if y.shape[0] == X.shape[0]:
self.y_fit_ = y
else:
raise ValueError(
f"y has {y.shape[0]} samples, but X has {X.shape[0]}. "
"Both must have the same number of samples."
)
else:
self.y_fit_ = None
# Adjust number of components
if self.n_components is None:
n_components = self.kernel_X_.shape[0]
else:
n_components = min(self.kernel_X_.shape[0], self.n_components)
# Choose eigen solver
# Adjust regularization parameter
if self.alpha is None:
alpha = 0.0
else:
alpha = self.alpha
# Compute regression
self.eigresults = _regressors.reduced_rank_regression_dirichlet(
self.kernel_X_,
self.N_,
self.M_,
self.shift,
alpha,
n_components,
)
self.rank_ = self.eigresults["values"].shape[0]
logger.info(f"Fitted {self.__class__.__name__} model.")
return self
[docs]
def predict(self, X, t, observable=False) -> ndarray:
r"""
Predict the expected observable value at time :math:`t`, conditional on
the initial condition ``X``.
This computes:
.. math::
\mathbb{E}[f(X_t) \mid X_0 = X],
using the spectral representation of the generator and the Koopman
semigroup :math:`e^{t \mathcal{L}}`.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Evaluation points.
t : float
Time horizon for the Koopman propagation.
observable : bool, default=False
If ``True``, returns the predicted observable at time :math:`t`
instead of the system state.
Returns
-------
ndarray of shape (n_samples, n_dim)
Predicted observable value :math:`\mathbb{E}[f(X_t)]`.
"""
modes = self.dynamical_modes(X, observable)
pred = _regressors.predict_generator(t, modes)
return pred
[docs]
def eig(self, eval_left_on=None, eval_right_on=None):
r"""
Predict the expected observable value at time :math:`t`, conditional on
the initial condition ``X``.
This computes:
.. math::
\mathbb{E}[f(X_t) \mid X_0 = X],
using the spectral representation of the generator and the Koopman
semigroup :math:`e^{t \mathcal{L}}`.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Evaluation points.
t : float
Time horizon for the Koopman propagation.
observable : ndarray of shape (n_samples, n_dim)
Observable :math:`f(X)` to propagate in time.
recompute : bool, default=True
If ``True``, recompute kernel matrices between ``X`` and ``X_fit_``.
If ``False``, reuse precomputed training kernels.
Returns
-------
ndarray of shape (n_samples, n_dim)
Predicted observable value :math:`\mathbb{E}[f(X_t)]`.
"""
check_is_fitted(self)
if eval_left_on is None and eval_right_on is None:
# (eigenvalues,)
return self.eigresults["values"]
elif eval_left_on is None and eval_right_on is not None:
# (eigenvalues, right eigenfunctions)
eval_right_on = validate_data(self, eval_right_on, reset=False)
kernel_Xin_X_or_Y, N_Xin_X_or_Y = self._get_kernel(
eval_right_on, self.X_fit_, get_derivatives=True
)
block_matrix = np.block(
[np.sqrt(self.shift) * kernel_Xin_X_or_Y, N_Xin_X_or_Y]
)
return self.eigresults["values"], np.sqrt(
2
) * _regressors.evaluate_eigenfunction(
self.eigresults, "right", block_matrix
)
elif eval_left_on is not None and eval_right_on is None:
# (eigenvalues, left eigenfunctions)
eval_left_on = validate_data(self, eval_left_on, reset=False)
kernel_Xin_X_or_Y = self._get_kernel(eval_left_on, self.X_fit_)
return self.eigresults["values"], _regressors.evaluate_eigenfunction(
self.eigresults, "left", kernel_Xin_X_or_Y
)
elif eval_left_on is not None and eval_right_on is not None:
# (eigenvalues, left eigenfunctions, right eigenfunctions)
eval_right_on = validate_data(self, eval_right_on, reset=False)
eval_left_on = validate_data(self, eval_left_on, reset=False)
kernel_Xin_X_or_Y, N_Xin_X_or_Y = self._get_kernel(
eval_right_on, self.X_fit_, get_derivatives=True
)
block_matrix = np.block(
[np.sqrt(self.shift) * kernel_Xin_X_or_Y, N_Xin_X_or_Y]
)
return (
self.eigresults["values"],
_regressors.evaluate_eigenfunction(
self.eigresults, "left", kernel_Xin_X_or_Y
),
np.sqrt(2)
* _regressors.evaluate_eigenfunction(
self.eigresults, "right", block_matrix
),
)
[docs]
def dynamical_modes(self, X, observable=False) -> DynamicalModes:
"""
Compute the dynamical mode decomposition of an observable.
For an observable :math:`f`, its expansion in generator modes is:
.. math::
f(x) = \\sum_{i=1}^r \\langle \\xi_i, f \\rangle \\, \\psi_i(x),
where :math:`\\xi_i` and :math:`\\psi_i` are left and right eigenfunctions.
Time evolution under the semigroup :math:`e^{t\\mathcal{L}}` acts as:
.. math::
f_t(x) = \\sum_i e^{t \\lambda_i} \\langle \\xi_i, f \\rangle \\psi_i(x).
Parameters
----------
X : ndarray
Points at which the right eigenfunctions will be evaluated.
observable : bool, default=False
If ``True``, returns the predicted observable at time :math:`t`
instead of the system state.
Returns
-------
DynamicalModes
Structured object containing:
- eigenvalues :math:`e^{\\lambda_i}`,
- mode coefficients,
- conditioning factors for evolution.
"""
check_is_fitted(self)
X = validate_data(self, X, reset=False)
if observable:
observable_fit_ = self.y_fit_
else:
observable_fit_ = self.X_fit_
levecs = self.eigresults["left"]
npts = levecs.shape[
0
] # We use the eigenvector to be consistent with the dirichlet estimator that does not
# have the same shape #obs_train.shape[0]
K_Xin_X, N_Xin_X = self._get_kernel(X, self.X_fit_, get_derivatives=True)
block_matrix = np.block([np.sqrt(self.shift) * K_Xin_X, N_Xin_X])
conditioning = np.sqrt(2) * _regressors.evaluate_eigenfunction(
self.eigresults, "right", block_matrix
) # [num_initial_conditions, rank]
modes_ = np.einsum(
"nr,nd" + "->rd", levecs.conj(), np.sqrt(self.shift) * observable_fit_
) / np.sqrt(npts) # [rank, features]
# modes_ = np.expand_dims(modes_, axis=1)
result = DynamicalModes(np.exp(self.eigresults["values"]), conditioning, modes_)
return result
def _get_kernel(
self, X, Y=None, get_derivatives=False, get_second_derivatives=False
):
"""Compute the pairwise kernel matrix."""
if not (isinstance(self.kernel, str) and self.kernel == "rbf"):
raise NotImplementedError
params = {
"gamma": self.gamma_,
}
length_scale = 1 / np.sqrt(2 * self.gamma_)
if Y is None:
Y = X
if get_derivatives and get_second_derivatives:
K_X = pairwise_kernels(
X,
Y,
metric=self.kernel,
filter_params=True,
n_jobs=self.n_jobs,
**params,
)
N = _utils.return_grad(K_X, X, Y, self.diffusion, length_scale)
M = _utils.return_grad2(K_X, X, Y, self.diffusion, length_scale)
return K_X, M, N
elif get_derivatives:
K_X = pairwise_kernels(
X,
Y,
metric=self.kernel,
filter_params=True,
n_jobs=self.n_jobs,
**params,
)
N = _utils.return_grad(K_X, X, Y, self.diffusion, length_scale)
return K_X, N
else:
return pairwise_kernels(
X,
Y,
metric=self.kernel,
filter_params=True,
n_jobs=self.n_jobs,
**params,
)
def _init_kernels(self, X):
"""Initialize kernel matrices for training."""
K_X, M, N = self._get_kernel(
X, get_derivatives=True, get_second_derivatives=True
)
return K_X, M, N
def _pre_fit_checks(self, X, diffusion):
"""Perform pre-fit checks and initialize kernel matrices."""
n_samples, d = X.shape
if np.isscalar(diffusion):
# Broadcast scalar to shape (n_samples, d)
self.diffusion = np.full((n_samples, d), diffusion)
# Case 2: constant vector of length d
elif isinstance(diffusion, np.ndarray):
if diffusion.ndim == 1 and diffusion.shape == (d,):
# Broadcast vector to all samples
self.diffusion = np.tile(diffusion[None, :], (n_samples, 1))
elif diffusion.ndim == 2 and diffusion.shape == (n_samples, d):
# Case 3: sample-dependent vectors (n_samples, d)
self.diffusion = diffusion
else:
raise ValueError(
f"diffusion must be a float, or a ndarray of shape ({d=},), or" \
" ({n_samples=}, {d=}) while got {diffusion.shape}."
)
else:
raise ValueError(
f"diffusion must be a float or a numpy array, while got {type(diffusion)}."
)
self.diffusion /= np.sqrt(2) # The genertor prefactor is diffusion/sqrt(2)
X = validate_data(self, X)
self.gamma_ = 1 / X.shape[1] if self.gamma is None else self.gamma
self.X_fit_ = X
self.kernel_X_, self.M_, self.N_ = self._init_kernels(self.X_fit_)
def __sklearn_tags__(self):
tags = super().__sklearn_tags__()
tags.target_tags.required = False
tags.requires_fit = True
return tags