from math import sqrt
import numpy as np
import pandas as pd
import scipy.integrate
[docs]
def make_duffing(
X0,
n_steps=100,
dt=0.01,
alpha=0.5,
beta=0.0625,
gamma=0.1,
delta=2.5,
omega=2.0,
):
"""
Generate a trajectory from the Duffing oscillator.
The Duffing oscillator is a damped driven nonlinear harmonic oscillator
commonly used to study chaotic dynamics in physics and engineering.
The system is governed by the differential equation:
.. math::
x'' + \\delta x' + \\alpha x + \\beta x^3 = \\gamma \\text{cos}(\\omega t).
Parameters
----------
X0 : array-like, shape (2,)
Initial conditions [position, velocity].
n_steps : int, default=100
Number of time steps to simulate.
dt : float, default=0.01
Time step size for numerical integration.
alpha : float, default=0.5
Linear stiffness coefficient.
beta : float, default=0.0625
Nonlinear stiffness coefficient (cubic term).
gamma : float, default=0.1
Amplitude of the driving force.
delta : float, default=2.5
Damping coefficient.
omega : float, default=2.0
Angular frequency of the driving force.
Returns
-------
df : pandas.DataFrame
Trajectory of the oscillator with columns ``['position', 'velocity']``
and ``n_steps + 1`` samples. Has a MultiIndex with levels ``['step', 'time']``.
Metadata stored in ``df.attrs`` includes:
- ``'generator'``: ``'make_duffing'``;
- ``'X0'``: initial conditions;
- ``'params'``: dict of all parameters.
Examples
--------
>>> import numpy as np
>>> X0 = np.array([0.0, 0.0])
>>> df = make_duffing(X0, n_steps=1000, dt=0.01)
>>> df.shape
(1001, 2)
>>> df.columns.tolist()
['position', 'velocity']
>>> df.attrs['generator']
'make_duffing'
>>> # Access time values from index
>>> times = df.index.get_level_values('time')
>>> # Generate chaotic trajectory
>>> df_chaotic = make_duffing(
... X0=[0.1, 0.1],
... n_steps=5000,
... alpha=-1.0,
... beta=1.0,
... gamma=0.3,
... delta=0.25,
... omega=1.0
... )
"""
X0 = np.asarray(X0)
if X0.shape != (2,):
raise ValueError(f"X0 must have shape (2,), got {X0.shape}")
def duffing_rhs(t, x):
"""Right-hand side of the Duffing oscillator ODE."""
return np.array(
[
x[1],
-delta * x[1]
- alpha * x[0]
- beta * x[0] ** 3
+ gamma * np.cos(omega * t),
]
)
t_eval = np.arange(0, (n_steps + 1) * dt, dt)
t_span = (0, t_eval[-1])
sol = scipy.integrate.solve_ivp(
duffing_rhs, t_span, X0, t_eval=t_eval, method="RK45"
)
# Create MultiIndex: (step, time)
step_index = np.arange(len(sol.t))
index = pd.MultiIndex.from_arrays([step_index, sol.t], names=["step", "time"])
# Create DataFrame
df = pd.DataFrame(sol.y.T, columns=["position", "velocity"], index=index)
# Store metadata
df.attrs = {
"generator": "make_duffing",
"X0": X0.tolist(),
"params": {
"n_steps": n_steps,
"dt": dt,
"alpha": alpha,
"beta": beta,
"gamma": gamma,
"delta": delta,
"omega": omega,
},
}
return df
[docs]
def make_lorenz63(
X0,
n_steps=100,
dt=0.01,
sigma=10.0,
mu=28.0,
beta=2.667, # 8/3
):
"""
Generate a trajectory from the Lorenz-63 system.
The Lorenz-63 system is a simplified mathematical model of atmospheric
convection that exhibits chaotic behavior. It is one of the most famous
examples of deterministic chaos.
The system is governed by:
.. math::
\\begin{cases}
\\frac{dx}{dt} = \\sigma(y - x) \\\\
\\frac{dy}{dt} = x(\\mu - z) - y \\\\
\\frac{dz}{dt} = xy - \\beta z
\\end{cases}
Parameters
----------
X0 : array-like, shape (3,)
Initial conditions ``[x, y, z]``.
n_steps : int, default=100
Number of time steps to simulate.
dt : float, default=0.01
Time step size for numerical integration.
sigma : float, default=10.0
The :math:`\\sigma` parameter, controlling the ratio of the rate of
heat conduction to the rate of convection.
mu : float, default=28.0
The :math:`\\mu` parameter (also called :math:`\\rho`), representing
the Rayleigh number.
beta : float, default=8/3
The :math:`\\beta` parameter, related to the physical dimensions of
the convection layer.
Returns
-------
df : pandas.DataFrame
Trajectory of the Lorenz-63 system with columns ``['x', 'y', 'z']`` and
``n_steps + 1`` samples.
Has a MultiIndex with levels ``['step', 'time']``.
Metadata stored in ``df.attrs`` includes:
- ``'generator'``: ``'make_lorenz63'``;
- ``'X0'``: initial conditions;
- ``'params'``: dict of all parameters.
Examples
--------
>>> import numpy as np
>>> X0 = np.array([1.0, 0.0, 0.0])
>>> df = make_lorenz63(X0, n_steps=1000, dt=0.01)
>>> df.shape
(1001, 3)
>>> df.columns.tolist()
['x', 'y', 'z']
>>> df.attrs['generator']
'make_lorenz63'
Access time values from index:
>>> times = df.index.get_level_values('time')
Generate classic chaotic trajectory from near the attractor:
>>> df_chaotic = make_lorenz63(
... X0=[0.0, 1.0, 1.05],
... n_steps=10000,
... dt=0.01,
... sigma=10.0,
... mu=28.0,
... beta=8.0/3.0
... )
"""
X0 = np.asarray(X0)
if X0.shape != (3,):
raise ValueError(f"X0 must have shape (3,), got {X0.shape}")
# Precompute linearized matrix for efficiency
M_lin = np.array([[-sigma, sigma, 0], [mu, 0, 0], [0, 0, -beta]])
def lorenz63_rhs(t, x):
"""Right-hand side of the Lorenz-63 system."""
dx = M_lin @ x
dx[1] -= x[2] * x[0]
dx[2] += x[0] * x[1]
return dx
t_eval = np.arange(0, (n_steps + 1) * dt, dt)
t_span = (0, t_eval[-1])
sol = scipy.integrate.solve_ivp(
lorenz63_rhs, t_span, X0, t_eval=t_eval, method="RK45"
)
# Create MultiIndex: (step, time)
step_index = np.arange(len(sol.t))
index = pd.MultiIndex.from_arrays([step_index, sol.t], names=["step", "time"])
# Create DataFrame
df = pd.DataFrame(sol.y.T, columns=["x", "y", "z"], index=index)
# Store metadata
df.attrs = {
"generator": "make_lorenz63",
"X0": X0.tolist(),
"params": {
"n_steps": n_steps,
"dt": dt,
"sigma": sigma,
"mu": mu,
"beta": beta,
},
}
return df
[docs]
def make_linear_system(
X0,
A,
n_steps=100,
noise=0.0,
dt=1.0,
random_state=None,
):
"""
Generate a trajectory from a discrete-time linear dynamical system.
The linear dynamical system is governed by:
.. math::
x_{t+1} = A x_t + \\xi_t
where :math:`\\xi_t \\sim \\mathcal{N}(0, \\sigma^2 I)` is zero-mean
Gaussian noise with standard deviation :math:`\\sigma`.
For this linear system, the Koopman operator is simply the transpose
of :math:`A`.
Parameters
----------
X0 : array-like, shape (d,)
Initial conditions, where ``d`` is the system dimension.
A : array-like, shape (d, d)
State transition matrix defining the linear dynamics.
n_steps : int, default=100
Number of time steps to simulate.
noise : float, default=0.0
Standard deviation of the zero-mean Gaussian noise :math:`\\sigma`.
dt : float, default=1.0
Time step size. For discrete-time systems, this is typically 1.0.
random_state : int, RandomState instance or None, default=None
Controls the random number generation for the noise.
Pass an ``int`` for reproducible output across multiple function calls.
Returns
-------
df : pandas.DataFrame
Trajectory of the linear system with columns ``['x0', 'x1', ..., 'x{d-1}']``
and ``n_steps + 1`` samples.
Has a MultiIndex with levels ``['step', 'time']``.
Metadata stored in ``df.attrs`` includes:
- ``'generator'``: ``'make_linear_system'``;
- ``'X0'``: initial conditions;
- ``'A'``: state transition matrix;
- ``'params'``: dict of all parameters.
Examples
--------
>>> import numpy as np
Stable 2D linear system:
>>> A = np.array([[0.9, 0.1], [-0.1, 0.9]])
>>> X0 = np.array([1.0, 0.0])
>>> df = make_linear_system(X0, A, n_steps=100)
>>> df.shape
(101, 2)
>>> df.columns.tolist()
['x0', 'x1']
With Gaussian noise:
>>> df_noisy = make_linear_system(
... X0=[1.0, 0.0],
... A=A,
... n_steps=100,
... noise=0.1,
... random_state=42
... )
Unstable system (eigenvalues > 1):
>>> A_unstable = np.array([[1.1, 0.0], [0.0, 1.1]])
>>> df_unstable = make_linear_system(
... X0=[0.1, 0.1],
... A=A_unstable,
... n_steps=50
... )
Rotation matrix (periodic dynamics):
>>> theta = np.pi / 4
>>> A_rotation = np.array([
... [np.cos(theta), -np.sin(theta)],
... [np.sin(theta), np.cos(theta)]
... ])
>>> df_rotation = make_linear_system(
... X0=[1.0, 0.0],
... A=A_rotation,
... n_steps=100
... )
Access the Koopman operator (transpose of A):
>>> K = np.array(df.attrs['A']).T
Notes
-----
The Koopman operator for this linear system is the transpose of the
state transition matrix :math:`A`. This makes linear systems ideal
test cases for Koopman operator learning algorithms.
"""
X0 = np.asarray(X0)
A = np.asarray(A)
if X0.ndim != 1:
raise ValueError(f"X0 must be 1-dimensional, got shape {X0.shape}")
if A.ndim != 2:
raise ValueError(f"A must be 2-dimensional, got shape {A.shape}")
if A.shape[0] != A.shape[1]:
raise ValueError(f"A must be square, got shape {A.shape}")
d = A.shape[0]
if X0.shape[0] != d:
raise ValueError(f"X0 dimension ({X0.shape[0]}) must match A dimension ({d})")
# Initialize random number generator
rng = np.random.default_rng(random_state)
# Preallocate trajectory array
X = np.zeros((n_steps + 1, d))
X[0] = X0
# Simulate trajectory
for i in range(n_steps):
X[i + 1] = A @ X[i] + noise * rng.standard_normal(size=d)
# Create time array
t = np.arange(n_steps + 1) * dt
# Create MultiIndex: (step, time)
step_index = np.arange(n_steps + 1)
index = pd.MultiIndex.from_arrays([step_index, t], names=["step", "time"])
# Create DataFrame
columns = [f"x{i}" for i in range(d)]
df = pd.DataFrame(X, columns=columns, index=index)
# Store metadata
df.attrs = {
"generator": "make_linear_system",
"X0": X0.tolist(),
"A": A.tolist(),
"params": {
"n_steps": n_steps,
"noise": noise,
"dt": dt,
"random_state": random_state,
},
}
return df
[docs]
def make_logistic_map(
X0,
n_steps=100,
r=4.0,
M=10,
dt=1.0,
random_state=None,
):
"""
Generate a trajectory from the logistic map with optional trigonometric noise
:cite:t:`make_logistic_map-ostruszka2000dynamical`.
The logistic map is a discrete-time dynamical system defined by:
.. math::
x_{t+1} = r x_t (1 - x_t) + \\xi_t
where :math:`\\xi_t` is drawn from a trigonometric noise distribution
when ``M > 0``.
The classic chaotic logistic map uses :math:`r = 4`. For this system
with trigonometric noise, the eigenfunctions of the Koopman operator
can be computed analytically using the basis:
.. math::
\\phi_i(x) = c_i \\sin^{2M-i}(\\pi x) \\cos^i(\\pi x)
for :math:`i = 0, 1, \\ldots, 2M`.
Parameters
----------
X0 : float or array-like, shape (1,)
Initial condition. Must be in ``[0, 1]`` for standard logistic map.
n_steps : int, default=100
Number of time steps to simulate.
r : float, default=4.0
Growth rate parameter. The classic chaotic regime is at ``r = 4``.
M : int, default=10
Order of the trigonometric noise distribution. Higher ``M`` makes
the noise distribution more peaked around zero. If ``M <= 0``, no
noise is added.
dt : float, default=1.0
Time step size. For discrete-time systems, this is typically 1.0.
random_state : int, RandomState instance or None, default=None
Controls the random number generation for the noise.
Pass an ``int`` for reproducible output across multiple function calls.
Returns
-------
df : pandas.DataFrame
Trajectory of the logistic map with column ``['x']`` and ``n_steps + 1`` samples.
Has a MultiIndex with levels ``['step', 'time']``.
Metadata stored in ``df.attrs`` includes:
- ``'generator'``: ``'make_logistic_map'``;
- ``'X0'``: initial condition;
- ``'params'``: dict of all parameters.
Examples
--------
>>> import numpy as np
Classic chaotic logistic map:
>>> df = make_logistic_map(X0=0.1, n_steps=100, r=4.0)
>>> df.shape
(101, 1)
>>> df.columns.tolist()
['x']
With trigonometric noise:
>>> df_noisy = make_logistic_map(
... X0=0.1,
... n_steps=1000,
... r=4.0,
... M=10,
... random_state=42
... )
Different growth rates:
>>> # Period-2 orbit (r ≈ 3.2)
>>> df_period2 = make_logistic_map(X0=0.5, n_steps=100, r=3.2)
>>>
>>> # Chaotic (r = 4.0)
>>> df_chaotic = make_logistic_map(X0=0.5, n_steps=100, r=4.0)
Access metadata:
>>> df.attrs['params']['M']
10
Notes
-----
The trigonometric noise distribution has PDF:
.. math::
p(\\xi) = \\frac{\\pi}{B(M+0.5, 0.5)} \\cos^{2M}(\\pi \\xi)
for :math:`\\xi \\in [-0.5, 0.5]`, where :math:`B` is the beta function.
For the noisy logistic map with :math:`r = 4`, the Koopman operator
has known eigenfunctions that can be computed using the companion
function :func:``kooplearn.datasets.compute_logistic_map_eig``.
"""
# Handle X0
X0 = np.asarray(X0).flatten()
if X0.size != 1:
raise ValueError(
f"X0 must be scalar or 1D array with 1 element, got {X0.shape}"
)
X0 = float(X0[0])
if not 0 <= X0 <= 1:
raise ValueError(f"X0 must be in [0, 1] for standard logistic map, got {X0}")
# Initialize random number generator
rng = np.random.default_rng(random_state)
# Setup noise generator if needed
if M > 0:
from kooplearn.datasets._logistic_map import make_noise_rng
noise_rng = make_noise_rng(M, rng)
else:
noise_rng = None
# Initialize trajectory
# Preallocate trajectory array
X = np.zeros(n_steps + 1)
X[0] = X0
# Simulate trajectory
for i in range(n_steps):
# Logistic map
X[i + 1] = r * X[i] * (1 - X[i])
# Add noise if specified
if noise_rng is not None:
xi = noise_rng.rvs()
X[i + 1] = np.clip(X[i + 1] + xi, 0, 1)
# Create time array
t = np.arange(n_steps + 1) * dt
# Create MultiIndex: (step, time)
step_index = np.arange(n_steps + 1)
index = pd.MultiIndex.from_arrays([step_index, t], names=["step", "time"])
# Create DataFrame
df = pd.DataFrame(X, columns=["x"], index=index)
# Store metadata
df.attrs = {
"generator": "make_logistic_map",
"X0": X0,
"params": {
"n_steps": n_steps,
"r": r,
"M": M,
"dt": dt,
"random_state": random_state,
},
}
return df
[docs]
def make_regime_switching_var(
X0,
phi1,
phi2,
transition,
n_steps=100,
dt=1.0,
noise=0.0,
random_state=None,
):
"""
Generate a trajectory from a regime-switching vector autoregressive (VAR) process.
This model alternates between two linear dynamical regimes according to a
Markov transition matrix. At each step, the system evolves according to one
of two dynamics matrices ``phi1`` or ``phi2``, with optional Gaussian noise.
Mathematically, the system evolves as:
.. math::
x_{t+1} = \\Phi_{s_t} x_t + \\epsilon_t, \\quad
\\epsilon_t \\sim \\mathcal{N}(0, \\sigma^2 I)
where ``s_t`` is the active regime (0 or 1), evolving according to a
2x2 Markov transition matrix ``P`` such that
.. math::
P_{ij} = \\mathbb{P}(s_{t+1} = j \\mid s_t = i).
Parameters
----------
X0 : array-like of shape (n_features,)
Initial state of the system.
phi1 : ndarray of shape (n_features, n_features)
Dynamics matrix for regime 0.
phi2 : ndarray of shape (n_features, n_features)
Dynamics matrix for regime 1.
transition : ndarray of shape (2, 2)
Row-stochastic Markov transition matrix defining the probabilities
of switching between regimes. ``transition[i, j]`` gives the
probability of transitioning from regime ``i`` to ``j``.
n_steps : int, default=100
Number of discrete time steps to simulate.
dt : float, default=1.0
Time step size for discrete simulation. Added as metadata in the output.
noise : float, default=0.0
Standard deviation of Gaussian noise added at each step.
random_state : int, RandomState instance or None, default=None
Controls the random number generation for the noise.
Pass an ``int`` for reproducible output across multiple function calls.
Returns
-------
df : pandas.DataFrame
Generated trajectory with columns ``['x0', 'x1', ..., 'x{n_features-1}']``
and ``n_steps + 1`` samples. Has a MultiIndex with levels ``['step', 'time']``.
The following metadata are stored in ``df.attrs``:
- ``'generator'``: ``'make_regime_switching_var'``;
- ``'X0'``: initial state;
- ``'params'``: dictionary with all model parameters;
- ``'regimes'``: integer array of active regimes over time.
Examples
--------
>>> import numpy as np
>>> from kooplearn.datasets import make_regime_switching_var
>>> phi1 = np.array([[0.9, 0.1], [0.0, 0.8]])
>>> phi2 = np.array([[0.5, -0.2], [0.3, 0.7]])
>>> transition = np.array([[0.95, 0.05], [0.1, 0.9]])
>>> X0 = np.zeros(2)
>>> df = make_regime_switching_var(X0, phi1, phi2, transition, n_steps=100, noise=0.01)
"""
X0 = np.asarray(X0, dtype=float)
n_features = X0.shape[0]
# Validate shapes
phi1 = np.asarray(phi1, dtype=float)
phi2 = np.asarray(phi2, dtype=float)
transition = np.asarray(transition, dtype=float)
if phi1.shape != (n_features, n_features):
raise ValueError(
f"`phi1` must have shape {(n_features, n_features)}, got {phi1.shape}."
)
if phi2.shape != (n_features, n_features):
raise ValueError(
f"`phi2` must have shape {(n_features, n_features)}, got {phi2.shape}."
)
if transition.shape != (2, 2):
raise ValueError("`transition` must be a 2x2 matrix.")
if not np.allclose(transition.sum(axis=1), 1.0):
raise ValueError("Rows of `transition` must sum to 1.")
rng = np.random.default_rng(random_state)
phi = [phi1, phi2]
# Allocate trajectory and regime arrays
data = np.zeros((n_steps + 1, n_features))
regimes = np.zeros(n_steps + 1, dtype=int)
data[0] = X0
current_regime = 0
for t in range(n_steps):
noise_vec = noise * rng.standard_normal(size=n_features)
X_next = phi[current_regime] @ data[t] + noise_vec
data[t + 1] = X_next
# Draw next regime
current_regime = rng.choice([0, 1], p=transition[current_regime])
regimes[t + 1] = current_regime
# Time and step index
step_index = np.arange(n_steps + 1)
time_index = step_index * dt
index = pd.MultiIndex.from_arrays([step_index, time_index], names=["step", "time"])
columns = [f"x{i}" for i in range(n_features)]
df = pd.DataFrame(data, columns=columns, index=index)
# Metadata
df.attrs = {
"generator": "make_regime_switching_var",
"X0": X0.tolist(),
"params": {
"phi1": phi1,
"phi2": phi2,
"transition": transition,
"n_steps": n_steps,
"dt": dt,
"noise": noise,
"random_state": random_state,
},
"regimes": regimes,
}
return df
[docs]
def make_prinz_potential(
X0,
n_steps=10000,
dt=1e-4,
gamma=0.1,
sigma=sqrt(2.0),
random_state=None,
):
"""
Generate a 1D Langevin trajectory for the "Prinz potential"
:cite:t:`make_prinz_potential-Prinz2011`.
This quadruple-well potential exhibits three metastable states separated by
energy barriers. The dynamics follow the (discretized) overdamped Langevin equation:
.. math::
X_{t + 1} = X_t -\\frac{1}{\\gamma}\\nabla V{X_t}\\Delta t + \\frac{\\sigma}{\\gamma}\\sqrt{\\Delta t}\\xi_t,
where :math:`\\xi_t` is a Gaussian white noise process with zero mean and unit variance,
:math:`\\gamma` is the friction coefficient, and :math:`k_B T = \\frac{\\sigma^2}{2\\gamma}`
determines the thermal energy scale.
The potential is defined as:
.. math::
V(x) = 32 x^8 - 256 e^{-80 x^2} - 80 e^{-40 (x + 0.5)^2}
- 128 e^{-80 (x - 0.5)^2}.
Parameters
----------
X0 : float or array-like of shape (1,)
Initial position.
n_steps : int, default=10000
Number of discrete time steps to simulate.
dt : float, default=1e-4
Time step size for Euler–Maruyama integration.
gamma : float, default=0.1
Friction coefficient.
sigma : float, default=:math:`\\sqrt{2}`
Noise variance, corresponding to a thermal energy scale
:math:`k_B T = \\frac{\\sigma^2}{2\\gamma}`.
random_state : int, RandomState instance or None, default=None
Controls the random number generation for the noise.
Pass an ``int`` for reproducible output across multiple function calls.
Returns
-------
df : pandas.DataFrame
Trajectory of the particle with column ``['x']`` and ``n_steps + 1`` samples.
Indexed by a MultiIndex with levels ``['step', 'time']``.
Metadata stored in ``df.attrs`` includes:
- ``'generator'``: ``'make_prinz_potential'``;
- ``'X0'``: initial condition;
- ``'params'``: dictionary of all parameters.
Examples
--------
>>> import numpy as np
>>> from kooplearn.datasets import make_prinz_potential
>>> df = make_prinz_potential(X0=0.0, n_steps=5000, dt=1e-4)
""" # noqa: E501
X0 = np.atleast_1d(np.asarray(X0, dtype=float))
if X0.shape != (1,):
raise ValueError(f"X0 must have shape (1,), got {X0.shape}")
X0 = X0[0]
rng = np.random.default_rng(random_state)
inv_gamma = 1.0 / gamma
def force_fn(x):
"""Force corresponding to the triple-well potential."""
return -1.0 * (
-128 * np.exp(-80 * ((-0.5 + x) ** 2)) * (-0.5 + x)
- 512 * np.exp(-80 * (x**2)) * x
+ 32 * (x**7)
- 160 * np.exp(-40 * ((0.5 + x) ** 2)) * (0.5 + x)
)
X = np.zeros(n_steps + 1)
X[0] = X0
sqrt_term = inv_gamma * sigma * np.sqrt(dt)
for t in range(n_steps):
F = force_fn(X[t])
xi = rng.standard_normal()
X[t + 1] = X[t] + inv_gamma * F * dt + sqrt_term * xi
# MultiIndex (step, time)
step_index = np.arange(n_steps + 1)
time_index = step_index * dt
index = pd.MultiIndex.from_arrays([step_index, time_index], names=["step", "time"])
df = pd.DataFrame(X[:, None], columns=["x"], index=index)
df.attrs = {
"generator": "make_prinz_potential",
"X0": X0.tolist(),
"params": {
"n_steps": n_steps,
"dt": dt,
"gamma": gamma,
"sigma": sigma,
"random_state": random_state,
},
}
return df