Logistic Map¶
Authors: Erfan Mirzaei, Giacomo Turri, and Pietro Novelli
In this notebook, we reproduce the Logistic Map experiment from {footcite:t}Kostic2023DPNets using the kooplearn library. The experiment investigates the challenge of learning representations of the noisy logistic map, a one-dimensional dynamical system defined as
where \(\xi_t\) is an i.i.d. trigonometric noise term with density \(\propto \cos^N(x)\) (for even integer \(N\)), and \(r\) is a positive parameter controlling the map’s dynamics. Here, we set \(r = 4\), a case for which the exact solution is known.
The corresponding transfer operator has rank \(N + 1\), is non-normal, and admits closed-form eigenvalues and eigenfunctions {footcite:t}Kostic2022. Since \(\mathcal{T}\) is non-normal, learning its spectral decomposition is particularly challenging {footcite:t}Kostic2023SpectralRates. This makes the logistic map a valuable benchmark for testing learned representations. Because the exact form of \(\mathcal{T}\) is available, we can bypass operator regression and focus solely on evaluating the quality of the learned representation.
Dataset¶
To learn the logistic map, we first need to construct the dataset. The process involves the following steps:
Sample from the dynamical system to generate the training, validation, and test trajectories;
Prepare a dataloaders for use with PyTorch.
[1]:
# Defining the number of samples for each data split
n_train_samples = 10000
n_val_samples = 1000
n_test_samples = 1000
The kooplearn library provides the function make_logistic_map to generate trajectories from the logistic map. This function takes as input an initial condition, X0, and the number of samples, num_samples, and returns the trajectory starting from X0 and evolving for the specified number of steps. The resulting trajectory therefore has length num_samples + 1.
Once the trajectory is generated, we can split it into training, validation, and test subsets for subsequent experiments.
[2]:
import numpy as np
import scipy
import torch
from kooplearn.datasets import make_logistic_map
# Defining logistic map "hyperparameters" and generating the trajectory
M = 20
random_state = 42
traj = make_logistic_map(
X0=0.5,
n_steps=n_train_samples + n_val_samples + n_test_samples,
M=M,
random_state=0,
) # Setting the random_state for reproducibility
dataset = {
"train": traj[:n_train_samples],
"validation": traj[n_train_samples : n_train_samples + n_val_samples],
"test": traj[n_train_samples + n_val_samples :],
}
train_data = torch.from_numpy(dataset["train"].values).float()
val_data = torch.from_numpy(dataset["validation"].values).float()
[3]:
from torch.utils.data import DataLoader, TensorDataset
batch_size = 128
# Creating PyTorch TensorDatasets
train_ds = TensorDataset(train_data[:-1], train_data[1:])
val_ds = TensorDataset(val_data[:-1], val_data[1:])
# Creating DataLoaders
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=len(val_ds), shuffle=False)
Learning an appropriate feature map¶
In the next step, we implement the code needed to train the feature maps used to approximate the evolution (transfer) operator.
[4]:
# Experiment hyperparameters
learning_rate = 2e-4
opt = torch.optim.AdamW
num_epochs = 50
layer_dims = [64, 128, 64]
latent_dim = 8
Sinusoidal Embedding¶
The logistic map is defined on the interval \([0, 1]\) and is inherently periodic. To account for this property, we featurize the signal using trigonometric functions, which naturally encode periodicity. Specifically, we use \(\sin(2\pi x)\) and \(\cos(2\pi x)\) as the embedding features.
[5]:
class SinusoidalEmbedding(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
# Assuming x is in [0, 1]
x = 2 * torch.pi * x
return torch.cat([torch.sin(x), torch.cos(x)], dim=-1)
Creating an MLP using PyTorch¶
Next, we use the PyTorch library to define our neural network with the specified hyperparameters:
[6]:
class SimpleMLP(torch.nn.Module):
def __init__(
self, latent_dim: int, layer_dims: list[int], activation=torch.nn.LeakyReLU
):
super().__init__()
self.activation = activation
lin_dims = (
[2] + layer_dims + [latent_dim]
) # The 2 is for the sinusoidal embedding
layers = []
for layer_idx in range(len(lin_dims) - 2):
layers.append(
torch.nn.Linear(
lin_dims[layer_idx], lin_dims[layer_idx + 1], bias=False
)
)
layers.append(activation())
layers.append(torch.nn.Linear(lin_dims[-2], lin_dims[-1], bias=True))
self.layers = torch.nn.ModuleList(layers)
self.sin_embedding = SinusoidalEmbedding()
def forward(self, x):
# Sinusoidal embedding
x = self.sin_embedding(x).float()
# MLP
for layer in self.layers:
x = layer(x)
return x
[ ]:
from kooplearn.linear_model import Ridge
from kooplearn.torch.utils import FeatureMapEmbedder
device = "cuda" if torch.cuda.is_available() else "cpu"
class FeatureMap(torch.nn.Module):
def __init__(self, latent_dim: int, normalize_latents: bool = True):
super().__init__()
self.normalize_latents = normalize_latents
self.backbone = SimpleMLP(latent_dim=latent_dim, layer_dims=layer_dims)
self.lin = torch.nn.Linear(latent_dim, latent_dim, bias=False)
def forward(self, X, lagged: bool = False):
z = self.backbone(X)
if self.normalize_latents:
z = torch.nn.functional.normalize(z, dim=-1)
if lagged:
z = self.lin(z)
return z
def train_encoder_only(criterion: torch.nn.Module):
torch.manual_seed(random_state)
# Initialize model, loss and optimizer
model = FeatureMap(latent_dim).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
def step(batch, is_train: bool = True):
batch_X, batch_Y = batch
batch_X, batch_Y = batch_X.to(device), batch_Y.to(device)
if is_train:
optimizer.zero_grad()
phi_X, phi_Y = model(batch_X), model(batch_Y, lagged=True)
loss = criterion(phi_X, phi_Y)
if is_train:
loss.backward()
optimizer.step()
return loss.item()
for epoch in range(num_epochs):
# Training phase
model.train()
train_loss = []
for batch in train_dl:
train_loss.append(step(batch))
# Validation phase
model.eval()
val_loss = []
with torch.no_grad():
for batch in val_dl:
val_loss.append(step(batch, is_train=False))
if (epoch + 1) % 5 == 0 or (epoch == 0):
print(
f"EPOCH {epoch + 1:>2} Loss: {np.mean(train_loss):.2f} (train) - " \
f"{np.mean(val_loss):.2f} (val)"
)
embedder = FeatureMapEmbedder(encoder=model)
evolution_operator_model = Ridge(n_components=latent_dim).fit(
embedder.transform(train_data), train_data.numpy(force=True)
)
return {
"model": evolution_operator_model,
"feature_map": embedder.transform,
}
[8]:
from kooplearn.torch.nn import SpectralContrastiveLoss, VampLoss
trained_models = {}
for name, criterion in zip(
["VAMPNets", "Spectral Contrastive Loss"],
[VampLoss(center_covariances=False), SpectralContrastiveLoss()],
):
print(f"Fitting {name}")
trained_models[name] = train_encoder_only(criterion)
Fitting VAMPNets
EPOCH 1 Loss: -3.23 (train) - -3.07 (val)
EPOCH 5 Loss: -3.29 (train) - -3.11 (val)
EPOCH 10 Loss: -3.30 (train) - -3.14 (val)
EPOCH 15 Loss: -3.31 (train) - -3.14 (val)
EPOCH 20 Loss: -3.30 (train) - -3.13 (val)
EPOCH 25 Loss: -3.20 (train) - -3.00 (val)
EPOCH 30 Loss: -3.10 (train) - -3.05 (val)
EPOCH 35 Loss: -3.00 (train) - -2.80 (val)
EPOCH 40 Loss: -2.95 (train) - -2.82 (val)
EPOCH 45 Loss: -2.95 (train) - -2.81 (val)
Warning: The fitting algorithm discarded 3 dimensions of the 8 requested out of numerical instabilities.
The rank attribute has been updated to 5.
Consider decreasing the rank parameter.
EPOCH 50 Loss: -2.94 (train) - -2.83 (val)
Fitting Spectral Contrastive Loss
EPOCH 1 Loss: -0.73 (train) - -1.02 (val)
EPOCH 5 Loss: -1.32 (train) - -1.35 (val)
EPOCH 10 Loss: -1.59 (train) - -1.61 (val)
EPOCH 15 Loss: -1.82 (train) - -1.84 (val)
EPOCH 20 Loss: -2.04 (train) - -2.06 (val)
EPOCH 25 Loss: -2.26 (train) - -2.28 (val)
EPOCH 30 Loss: -2.45 (train) - -2.46 (val)
EPOCH 35 Loss: -2.62 (train) - -2.65 (val)
EPOCH 40 Loss: -2.77 (train) - -2.79 (val)
EPOCH 45 Loss: -2.88 (train) - -2.91 (val)
Warning: The fitting algorithm discarded 2 dimensions of the 8 requested out of numerical instabilities.
The rank attribute has been updated to 6.
Consider decreasing the rank parameter.
EPOCH 50 Loss: -2.99 (train) - -3.00 (val)
Determistic feature map based on Chebyshev Polynomials¶
[9]:
class ChebyT:
def __init__(self, feature_dim: int = 8):
self.feature_dim = feature_dim
def __call__(self, x):
vals = [self.scaled_chebyt(n, x) for n in range(self.feature_dim)]
return np.concatenate(vals, axis=-1)
def scaled_chebyt(self, n, x):
return scipy.special.eval_chebyt(n, 2 * x - 1)
cheby_model = Ridge(n_components=latent_dim).fit(
ChebyT(feature_dim=latent_dim)(dataset["train"].values))
trained_models["ChebyT"] = {
"model": cheby_model,
"feature_map": ChebyT(feature_dim=latent_dim),
}
Evaluation Metrics¶
Now it is time to evaluate the performance of model. To do this, for this specific example we will use two different metrics that are used in the paper: (i) the optimality gap, and (ii) the spectral error.
Optimality Gap
The definition of the optimality gap in the paper is $ \sum{i = 1}^{3} :nbsphinx-math:`sigma`{i}^{2}(\tau) - \mathcal{P}`^{0}(w)$, which informs on how close one is to capture the best rank-r approximation of the transfer operator, :math:mathcal{T}`.
Spectral Error
The spectral error is calculated by $ max_i min_j \left `\| :nbsphinx-math:lambda`i(:nbsphinx-math:`mathcal{P}`{\mathcal{H}}:nbsphinx-math:tau_{|:nbsphinx-math:mathcal{H}}) - \lambda_j(\tau) \right `\|$. This formula measures how well the true eigenvalues of :math:mathcal{T}` can be recovered within the representation space \(\mathcal{H_w}\).
In order to calculate the above metrics for evaluation the performance of the learned representation we should first calculate populational covariances and cross-covariances for the learned feature map.
The function get_population_covs computes population covariance and cross-covariance matrices relative to a given feature map.
[10]:
from kooplearn.datasets import (
compute_logistic_map_eig,
compute_logistic_map_invariant_pdf,
)
from kooplearn.datasets._logistic_map import TrigonometricNoise, logistic_map
invariant_pdf = compute_logistic_map_invariant_pdf(M=M)
transition_pdf = TrigonometricNoise(M=M).pdf
def get_population_covs(feature_map, integration_points=2**12 + 1):
# Covariance:
x = np.linspace(0, 1, integration_points)
dx = x[1] - x[0]
# Covariance:
phi_X = feature_map(x[:, None])
cov_X = np.einsum("xi,xj,x->xij", phi_X, phi_X, invariant_pdf(x))
cov_X = scipy.integrate.romb(cov_X, dx=dx, axis=0)
# Cross-Covariance:
X, Y = np.meshgrid(x, x, indexing="ij")
cov_XY = np.einsum(
"xi,yj,x,xy->xyij",
phi_X,
phi_X,
invariant_pdf(x),
transition_pdf(logistic_map(X) - Y),
)
cov_XY = scipy.integrate.romb(cov_XY, dx=dx, axis=0)
cov_XY = scipy.integrate.romb(cov_XY, dx=dx, axis=0)
return cov_X, cov_XY
[ ]:
from kooplearn.metrics import directed_hausdorff_distance
ref_eigs = compute_logistic_map_eig(M=M)
for key, model_dict in trained_models.items():
feature_map = model_dict["feature_map"]
cov, cross_cov = get_population_covs(feature_map)
OLS_eigs = scipy.linalg.eigvals(cross_cov, cov)
empirical_OLS_eigs = model_dict["model"].eig()
spectral_dist = directed_hausdorff_distance(OLS_eigs, ref_eigs)
empirical_spectral_dist = directed_hausdorff_distance(empirical_OLS_eigs, ref_eigs)
print(
f"{key} - Spectral Dist: {spectral_dist:.4f} - Empirical Spectral Dist: " \
f"{empirical_spectral_dist:.4f}"
)
VAMPNets - Spectral Dist: 0.2301 - Empirical Spectral Dist: 0.4664
Spectral Contrastive Loss - Spectral Dist: 0.1801 - Empirical Spectral Dist: 0.3862
ChebyT - Spectral Dist: 0.0789 - Empirical Spectral Dist: 0.2755