"""
Spectral decomposition solver for Fokker-Planck operator
associated with overdamped Langevin dynamics in 1D and 2D.
The operator L = -∇·(∇V ·) + Δ is discretized using cosine basis functions
with Galerkin projection.
"""
import numpy as np
from scipy.linalg import eig
# ============================================================================
# 1D Basis Functions and Quadrature
# ============================================================================
def build_cosine_basis_1d(domain_min, domain_max, num_basis, num_quad_points=None):
"""
Construct 1D cosine basis functions and derivatives on quadrature grid.
Args:
domain_min (float): Left boundary of domain [a, b]
domain_max (float): Right boundary of domain
num_basis (int): Number of cosine basis functions
num_quad_points (int, optional): Number of quadrature points.
Defaults to 2 * num_basis
Returns:
quad_points (ndarray): Quadrature points, shape (num_quad_points,)
quad_weights (ndarray): Trapezoidal quadrature weights
basis_vals (ndarray): Basis function values, shape (num_basis, num_quad_points)
basis_derivs (ndarray): Basis function derivatives, shape (num_basis, num_quad_points)
"""
if num_quad_points is None:
num_quad_points = 2 * num_basis
# Quadrature grid with trapezoidal weights
quad_points = np.linspace(domain_min, domain_max, num_quad_points)
domain_length = domain_max - domain_min
quad_weights = np.ones(num_quad_points) * domain_length / (num_quad_points - 1)
quad_weights[0] /= 2
quad_weights[-1] /= 2
# Scaled coordinates in [0, 1]
scaled_coords = (quad_points - domain_min) / domain_length
# Cosine basis: φ_k(x) = cos(kπ(x-a)/(b-a))
mode_indices = np.arange(num_basis)
basis_vals = np.cos(np.pi * np.outer(mode_indices, scaled_coords))
# Derivatives: φ'_k(x) = -kπ/(b-a) sin(kπ(x-a)/(b-a))
basis_derivs = (-np.pi * mode_indices[:, None] / domain_length) * np.sin(
np.pi * np.outer(mode_indices, scaled_coords)
)
return quad_points, quad_weights, basis_vals, basis_derivs
# ============================================================================
# 1D Matrix Assembly
# ============================================================================
def assemble_operators_1d(
domain_min,
domain_max,
num_basis,
potential_gradient,
gamma,
sigma,
num_quad_points=None,
):
"""Assemble 1D Fokker-Planck operator matrices using Galerkin projection.
The operator is L = -d/dx(V'(x) ·) - d²/dx²
Parameters
----------
domain_min : float
Left boundary of the domain.
domain_max : float
Right boundary of the domain.
num_basis : int
Number of cosine basis functions.
potential_gradient : callable
Function ``V'(x)`` returning the gradient at points ``x``.
gamma : float
Friction coefficient of the Langevin dynamics.
sigma : float
Noise amplitude of the Langevin dynamics.
num_quad_points : int, optional
Number of quadrature points. If ``None``, defaults to ``2 * num_basis``.
Returns
-------
stiffness_matrix : np.ndarray
Stiffness matrix (A) of shape ``(num_basis, num_basis)``.
mass_matrix : np.ndarray
Mass matrix (B) of shape ``(num_basis, num_basis)``.
"""
quad_points, quad_weights, basis_vals, basis_derivs = build_cosine_basis_1d(
domain_min, domain_max, num_basis, num_quad_points
)
# Evaluate potential gradient at quadrature points
gradient_vals = potential_gradient(quad_points)
# Mass matrix: B_ij = ∫ φ_i φ_j dx
mass_matrix = np.tensordot(basis_vals, basis_vals * quad_weights, axes=(1, 1))
# Diffusion (stiffness): -∫ φ'_i φ'_j dx
diffusion_matrix = -np.tensordot(
basis_derivs, basis_derivs * quad_weights, axes=(1, 1)
)
# Advection: -∫ φ_i V'(x) φ'_j dx
advection_matrix = -np.tensordot(
basis_vals, (gradient_vals[None, :] * basis_derivs) * quad_weights, axes=(1, 1)
)
stiffness_matrix = (
sigma * sigma * diffusion_matrix / (2 * gamma) + advection_matrix
) / gamma
return stiffness_matrix, mass_matrix
# ============================================================================
# 2D Matrix Assembly
# ============================================================================
def assemble_operators_2d(
domain_bounds,
num_basis,
potential_gradient,
gamma,
sigma,
num_quad_points=None,
method="vectorized",
):
"""
Assemble 2D Fokker-Planck operator using tensor product cosine basis.
The operator is L = -∇·(∇V ·) - Δ
Args:
domain_bounds (tuple): (x_min, x_max, y_min, y_max)
num_basis (tuple): (num_basis_x, num_basis_y) number of modes per dimension
potential_gradient (callable): Function returning (grad_x, grad_y)
given arrays X, Y of shape (Mx, My)
num_quad_points (tuple, optional): (num_quad_x, num_quad_y)
method (str): Assembly method - 'vectorized' (fast, more memory) or
'kronecker' (slower, less memory). Default: 'vectorized'
Returns:
stiffness_matrix (ndarray): Operator matrix A, shape (N, N) where N = Nx*Ny
mass_matrix (ndarray): Mass matrix B, shape (N, N)
"""
x_min, x_max, y_min, y_max = domain_bounds
num_basis_x, num_basis_y = num_basis
if num_quad_points is None:
num_quad_points = (2 * num_basis_x, 2 * num_basis_y)
num_quad_x, num_quad_y = num_quad_points
# 1D quadrature rules
x_quad = np.linspace(x_min, x_max, num_quad_x)
y_quad = np.linspace(y_min, y_max, num_quad_y)
weights_x = np.ones(num_quad_x) * (x_max - x_min) / (num_quad_x - 1)
weights_x[0] /= 2
weights_x[-1] /= 2
weights_y = np.ones(num_quad_y) * (y_max - y_min) / (num_quad_y - 1)
weights_y[0] /= 2
weights_y[-1] /= 2
# 2D quadrature grid
X_grid, Y_grid = np.meshgrid(x_quad, y_quad, indexing="ij")
weights_2d = weights_x[:, None] * weights_y[None, :]
# Evaluate potential gradient on grid
grad_x, grad_y = potential_gradient(X_grid, Y_grid)
# Build 1D cosine basis functions and derivatives
mode_idx_x = np.arange(num_basis_x).reshape(-1, 1)
mode_idx_y = np.arange(num_basis_y).reshape(-1, 1)
# Basis functions
x_scaled = (x_quad - x_min) / (x_max - x_min)
y_scaled = (y_quad - y_min) / (y_max - y_min)
basis_x = np.cos(mode_idx_x * np.pi * x_scaled) # (Nx, num_quad_x)
basis_y = np.cos(mode_idx_y * np.pi * y_scaled) # (Ny, num_quad_y)
# Derivatives
deriv_x = (
-mode_idx_x * np.pi / (x_max - x_min) * np.sin(mode_idx_x * np.pi * x_scaled)
)
deriv_y = (
-mode_idx_y * np.pi / (y_max - y_min) * np.sin(mode_idx_y * np.pi * y_scaled)
)
# Choose assembly method
if method == "kronecker":
# Memory-efficient Kronecker product approach (less memory, slightly slower)
stiffness_matrix, mass_matrix = _assemble_2d_kronecker(
basis_x,
basis_y,
deriv_x,
deriv_y,
weights_x,
weights_y,
grad_x,
grad_y,
gamma,
sigma,
)
else: # method == 'vectorized'
# Fast vectorized approach (more memory, much faster)
stiffness_matrix, mass_matrix = _assemble_2d_vectorized(
num_basis_x,
num_basis_y,
num_quad_x,
num_quad_y,
basis_x,
basis_y,
deriv_x,
deriv_y,
weights_2d,
grad_x,
grad_y,
gamma,
sigma,
)
return stiffness_matrix, mass_matrix
def _assemble_2d_vectorized(
num_basis_x,
num_basis_y,
num_quad_x,
num_quad_y,
basis_x,
basis_y,
deriv_x,
deriv_y,
weights_2d,
grad_x,
grad_y,
gamma,
sigma,
):
"""Assemble 2D operator matrices using a vectorized approach with einsum.
This method is fast but memory-intensive as it constructs the full 2D
tensor product basis functions.
Parameters
----------
num_basis_x : int
Number of basis functions in the x-dimension.
num_basis_y : int
Number of basis functions in the y-dimension.
num_quad_x : int
Number of quadrature points in the x-dimension.
num_quad_y : int
Number of quadrature points in the y-dimension.
basis_x : np.ndarray
1D basis functions for the x-dimension, of shape
``(num_basis_x, num_quad_x)``.
basis_y : np.ndarray
1D basis functions for the y-dimension, of shape
``(num_basis_y, num_quad_y)``.
deriv_x : np.ndarray
1D basis derivatives for the x-dimension, of shape
``(num_basis_x, num_quad_x)``.
deriv_y : np.ndarray
1D basis derivatives for the y-dimension, of shape
``(num_basis_y, num_quad_y)``.
weights_2d : np.ndarray
2D quadrature weights, of shape ``(num_quad_x, num_quad_y)``.
grad_x : np.ndarray
Potential gradient in the x-dimension on the quadrature grid,
of shape ``(num_quad_x, num_quad_y)``.
grad_y : np.ndarray
Potential gradient in the y-dimension on the quadrature grid,
of shape ``(num_quad_x, num_quad_y)``.
gamma : float
Friction coefficient of the Langevin dynamics.
sigma : float
Noise amplitude of the Langevin dynamics.
Returns
-------
stiffness_matrix : np.ndarray
Stiffness matrix (A) of shape ``(N, N)`` where ``N = num_basis_x * num_basis_y``.
mass_matrix : np.ndarray
Mass matrix (B) of shape ``(N, N)`` where ``N = num_basis_x * num_basis_y``.
"""
total_basis = num_basis_x * num_basis_y
# Precompute tensor product basis functions
basis_2d = np.zeros((total_basis, num_quad_x, num_quad_y))
deriv_x_2d = np.zeros((total_basis, num_quad_x, num_quad_y))
deriv_y_2d = np.zeros((total_basis, num_quad_x, num_quad_y))
idx = 0
for i in range(num_basis_x):
for j in range(num_basis_y):
basis_2d[idx] = np.outer(basis_x[i], basis_y[j])
deriv_x_2d[idx] = np.outer(deriv_x[i], basis_y[j])
deriv_y_2d[idx] = np.outer(basis_x[i], deriv_y[j])
idx += 1
# Vectorized assembly using Einstein summation
# Apply quadrature weights to basis functions once
basis_weighted = basis_2d * weights_2d[None, :, :]
deriv_x_weighted = deriv_x_2d * weights_2d[None, :, :]
deriv_y_weighted = deriv_y_2d * weights_2d[None, :, :]
# Mass matrix: B[i,j] = ∫∫ φ_i φ_j dx dy
# Using einsum: sum over spatial dimensions (axes 1,2)
mass_matrix = np.einsum("ixy,jxy->ij", basis_2d, basis_weighted)
# Diffusion term: -∫∫ (∇φ_i · ∇φ_j) dx dy
diffusion_matrix = -(
np.einsum("ixy,jxy->ij", deriv_x_2d, deriv_x_weighted)
+ np.einsum("ixy,jxy->ij", deriv_y_2d, deriv_y_weighted)
)
# Advection term: -∫∫ φ_i (∇V · ∇φ_j) dx dy
# Precompute drift-weighted derivatives
drift_deriv_x = grad_x * deriv_x_2d # (total_basis, Mx, My)
drift_deriv_y = grad_y * deriv_y_2d # (total_basis, Mx, My)
advection_matrix = -np.einsum(
"ixy,jxy->ij", basis_2d, (drift_deriv_x + drift_deriv_y) * weights_2d
)
stiffness_matrix = (
sigma * sigma * diffusion_matrix / (2 * gamma) + advection_matrix
) / gamma
return stiffness_matrix, mass_matrix
def _assemble_2d_kronecker(
basis_x,
basis_y,
deriv_x,
deriv_y,
weights_x,
weights_y,
grad_x,
grad_y,
gamma,
sigma,
):
"""Assemble 2D operator matrices using Kronecker products.
This memory-efficient method relies on 1D integrals and Kronecker products.
It is suitable for cases where the drift is separable or can be
approximated as separable.
Parameters
----------
basis_x : np.ndarray
1D basis functions for the x-dimension, of shape
``(num_basis_x, num_quad_x)``.
basis_y : np.ndarray
1D basis functions for the y-dimension, of shape
``(num_basis_y, num_quad_y)``.
deriv_x : np.ndarray
1D basis derivatives for the x-dimension, of shape
``(num_basis_x, num_quad_x)``.
deriv_y : np.ndarray
1D basis derivatives for the y-dimension, of shape
``(num_basis_y, num_quad_y)``.
weights_x : np.ndarray
1D quadrature weights for the x-dimension, of shape ``(num_quad_x,)``.
weights_y : np.ndarray
1D quadrature weights for the y-dimension, of shape ``(num_quad_y,)``.
grad_x : np.ndarray
Potential gradient in the x-dimension on the quadrature grid,
of shape ``(num_quad_x, num_quad_y)``.
grad_y : np.ndarray
Potential gradient in the y-dimension on the quadrature grid,
of shape ``(num_quad_x, num_quad_y)``.
gamma : float
Friction coefficient of the Langevin dynamics.
sigma : float
Noise amplitude of the Langevin dynamics.
Returns
-------
stiffness_matrix : np.ndarray
Stiffness matrix (A) of shape ``(N, N)`` where ``N = num_basis_x * num_basis_y``.
mass_matrix : np.ndarray
Mass matrix (B) of shape ``(N, N)`` where ``N = num_basis_x * num_basis_y``.
"""
# 1D mass and stiffness matrices
mass_x = basis_x @ np.diag(weights_x) @ basis_x.T
mass_y = basis_y @ np.diag(weights_y) @ basis_y.T
stiff_x = deriv_x @ np.diag(weights_x) @ deriv_x.T
stiff_y = deriv_y @ np.diag(weights_y) @ deriv_y.T
# Laplacian via Kronecker sum: -(A_x ⊗ M_y + M_x ⊗ A_y)
diffusion_matrix = -(np.kron(stiff_x, mass_y) + np.kron(mass_x, stiff_y))
# For advection, we need cross-terms which require 2D integration
# This is an approximation using coordinate-wise drift averages
drift_x_avg = np.mean(grad_x, axis=1) # Average over y
drift_y_avg = np.mean(grad_y, axis=0) # Average over x
# Advection matrices: -∫ φ_i V'(x) φ'_j dx (treating drift as separable)
adv_x = -basis_x @ np.diag(weights_x * drift_x_avg) @ deriv_x.T
adv_y = -basis_y @ np.diag(weights_y * drift_y_avg) @ deriv_y.T
advection_matrix = np.kron(adv_x, mass_y) + np.kron(mass_x, adv_y)
stiffness_matrix = (
sigma * sigma * diffusion_matrix / (2 * gamma) + advection_matrix
) / gamma
mass_matrix = np.kron(mass_x, mass_y)
return stiffness_matrix, mass_matrix
# ============================================================================
# Eigenfunction Evaluation
# ============================================================================
def eval_eigenfunctions_1d(eigenvectors, eval_points, domain_min, domain_max):
"""
Evaluate 1D eigenfunctions from cosine basis expansion coefficients.
Args:
eigenvectors (ndarray): (num_basis, num_modes) matrix of eigenvector coefficients
eval_points (ndarray): Points where to evaluate the eigenfunctions
domain_min, domain_max (float): Domain boundaries [a, b]
Returns:
eigenfunctions (ndarray): Shape (len(eval_points), num_modes)
eigenfunction values at evaluation points
"""
num_basis = eigenvectors.shape[0]
mode_indices = np.arange(num_basis)
# Cosine basis evaluated at eval_points
scaled_points = (eval_points - domain_min) / (domain_max - domain_min)
basis_at_points = np.cos(np.pi * np.outer(mode_indices, scaled_points))
# Linear combination: u_j(x) = Σ_k c^(j)_k φ_k(x)
eigenfunctions = basis_at_points.T @ eigenvectors
return eigenfunctions
def eval_eigenfunctions_2d(eigenvectors, x_points, y_points, domain_bounds, num_basis):
"""
Evaluate 2D eigenfunctions on a grid from cosine basis expansion coefficients.
Args:
eigenvectors (ndarray): (N, num_modes) eigenvector coefficients
where N = num_basis_x * num_basis_y (row-major flattening)
x_points, y_points (ndarray): 1D coordinate arrays for evaluation grid
domain_bounds (tuple): (x_min, x_max, y_min, y_max)
num_basis (tuple): (num_basis_x, num_basis_y) number of modes per dimension
Returns:
eigenfunctions (ndarray): Shape (len(x_points), len(y_points), num_modes)
eigenfunction values on the 2D grid
"""
x_min, x_max, y_min, y_max = domain_bounds
num_basis_x, num_basis_y = num_basis
num_modes = eigenvectors.shape[1]
# Scaled coordinates
x_scaled = (x_points - x_min) / (x_max - x_min)
y_scaled = (y_points - y_min) / (y_max - y_min)
# 1D cosine basis matrices
mode_idx_x = np.arange(num_basis_x).reshape(-1, 1)
mode_idx_y = np.arange(num_basis_y).reshape(-1, 1)
basis_x = np.cos(mode_idx_x * np.pi * x_scaled[None, :]) # (Nx, len(x_points))
basis_y = np.cos(mode_idx_y * np.pi * y_scaled[None, :]) # (Ny, len(y_points))
# Allocate output array
eigenfunctions = np.zeros((len(x_points), len(y_points), num_modes))
# Reconstruct each eigenfunction via tensor contraction
for mode_idx in range(num_modes):
# Reshape flattened coefficients to 2D grid
coeff_grid = eigenvectors[:, mode_idx].reshape(num_basis_x, num_basis_y)
# Tensor product: u(x,y) = Σ_{i,j} c_{ij} φ_i(x) φ_j(y)
# Implemented as: basis_x.T @ coeff_grid @ basis_y
eigenfunctions[:, :, mode_idx] = basis_x.T @ coeff_grid @ basis_y
return eigenfunctions
# ============================================================================
# Utility Functions
# ============================================================================
def sort_indices_by_magnitude(vector):
"""
Return indices that would sort vector by absolute value (ascending).
Args:
vector (ndarray): Input vector
Returns:
sort_indices (ndarray): Permutation indices
"""
abs_values = np.abs(vector)
sort_indices = np.argsort(abs_values)
return sort_indices
[docs]
def compute_prinz_potential_eig(gamma, sigma, dt, eval_right_on=None, num_components=4):
"""
Computes the eigenvalues and eigenfunctions of the Generator of the Overdamped
Langevin dynamics for the Prinz potential.
The generator :math:`\\mathcal{L}` is defined as:
.. math::
\\mathcal{L} = -\\frac{1}{\\gamma} \\nabla V(x) \\cdot \\nabla + \\frac{\\sigma^2}{2\\gamma} \\Delta
where :math:`V(x)` is the Prinz potential. The eigenvalues :math:`\\lambda_i`
and eigenfunctions :math:`\\psi_i(x)` satisfy :math:`\\mathcal{L} \\psi_i = \\lambda_i \\psi_i`.
The Koopman eigenvalues for the discrete-time system are then :math:`e^{\\lambda_i \\Delta t}`.
Parameters
----------
gamma : float
Friction coefficient of the Langevin dynamics.
sigma : float
Noise amplitude of the Langevin dynamics.
dt : float
Time step for the discrete-time Koopman operator.
eval_right_on : ndarray of shape (n_samples, n_features), optional
Data points on which to evaluate the **right** eigenfunctions.
If ``None``, right eigenfunctions are not evaluated.
num_components : int, default=4
Number of dominant eigenvalues and corresponding
eigenfunctions to return.
Returns
-------
eigenvalues : ndarray
The ``num_components`` dominant Koopman eigenvalues
(i.e., :math:`e^{\\lambda_i \\Delta t}`), sorted by magnitude in ascending order.
Shape ``(num_components,)``.
eigenfunctions : ndarray
The leading ``num_components`` right eigenfunctions evaluated at `eval_right_on`` if not
``None``. Shape ``(len(eval_right_on), num_components)``.
Notes
-----
The computation uses a Galerkin projection method with a 1D cosine basis
to discretize the Fokker-Planck operator. The domain for the basis functions
is set to ``(-3, 3)``.
""" # noqa: E501
prinz_grad = lambda x: (
-128 * np.exp(-80 * ((-0.5 + x) ** 2)) * (-0.5 + x)
- 512 * np.exp(-80 * (x**2)) * x
+ 32 * (x**7)
- 160 * np.exp(-40 * ((0.5 + x) ** 2)) * (0.5 + x)
)
Nx = 128
domain = (-3, 3) # A good value for the prinz potential
A, B = assemble_operators_1d(*domain, Nx, prinz_grad, gamma, sigma)
eigvals, eigvecs = eig(A, B)
sorted_idxs = sort_indices_by_magnitude(eigvals)
eigvals = eigvals[sorted_idxs].real
eigvecs = eigvecs[:, sorted_idxs].real
if eval_right_on is not None:
u = eval_eigenfunctions_1d(eigvecs, eval_right_on, *domain)
return np.exp(eigvals * dt)[:num_components], u[:, :num_components]
else:
return np.exp(eigvals * dt)[:num_components]