Logistic Map

In this notebook, we reproduce the Logistic Map experiment from {footcite:t}Kostic2023DPNets using the kooplearn library. The experiment investigates the challenge of learning representations of the noisy logistic map, a one-dimensional dynamical system defined as

\[x_{t + 1} = (rx_{t}(1 - x_{t}) + \xi_{t}) \mod 1,\]

where \(\xi_t\) is an i.i.d. trigonometric noise term with density \(\propto \cos^N(x)\) (for even integer \(N\)), and \(r\) is a positive parameter controlling the map’s dynamics. Here, we set \(r = 4\), a case for which the exact solution is known.

The corresponding transfer operator has rank \(N + 1\), is non-normal, and admits closed-form eigenvalues and eigenfunctions {footcite:t}Kostic2022. Since \(\mathcal{T}\) is non-normal, learning its spectral decomposition is particularly challenging {footcite:t}Kostic2023SpectralRates. This makes the logistic map a valuable benchmark for testing learned representations. Because the exact form of \(\mathcal{T}\) is available, we can bypass operator regression and focus solely on evaluating the quality of the learned representation.

Dataset

To learn the logistic map, we first need to construct the dataset. The process involves the following steps:

  • Sample from the dynamical system to generate the training, validation, and test trajectories;

  • Prepare a dataloaders for use with PyTorch.

[1]:
# Defining the number of samples for each data split
n_train_samples = 10000
n_val_samples = 1000
n_test_samples = 1000

The kooplearn library provides the function make_logistic_map to generate trajectories from the logistic map. This function takes as input an initial condition, X0, and the number of samples, num_samples, and returns the trajectory starting from X0 and evolving for the specified number of steps. The resulting trajectory therefore has length num_samples + 1.

Once the trajectory is generated, we can split it into training, validation, and test subsets for subsequent experiments.

[2]:
import numpy as np
import scipy
import torch

from kooplearn.datasets import make_logistic_map

# Defining logistic map "hyperparameters" and generating the trajectory
M = 20
random_state = 42

traj = make_logistic_map(
    X0=0.5,
    n_steps=n_train_samples + n_val_samples + n_test_samples,
    M=M,
    random_state=0,
)  # Setting the random_state for reproducibility

dataset = {
    "train": traj[:n_train_samples],
    "validation": traj[n_train_samples : n_train_samples + n_val_samples],
    "test": traj[n_train_samples + n_val_samples :],
}

train_data = torch.from_numpy(dataset["train"].values).float()
val_data = torch.from_numpy(dataset["validation"].values).float()
[3]:
from torch.utils.data import DataLoader, TensorDataset

batch_size = 128

# Creating PyTorch TensorDatasets
train_ds = TensorDataset(train_data[:-1], train_data[1:])
val_ds = TensorDataset(val_data[:-1], val_data[1:])

# Creating DataLoaders
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=len(val_ds), shuffle=False)

Learning an appropriate feature map

In the next step, we implement the code needed to train the feature maps used to approximate the evolution (transfer) operator.

[4]:
# Experiment hyperparameters
learning_rate = 2e-4
opt = torch.optim.AdamW
num_epochs = 50
layer_dims = [64, 128, 64]
latent_dim = 8

Sinusoidal Embedding

The logistic map is defined on the interval \([0, 1]\) and is inherently periodic. To account for this property, we featurize the signal using trigonometric functions, which naturally encode periodicity. Specifically, we use \(\sin(2\pi x)\) and \(\cos(2\pi x)\) as the embedding features.

[5]:
class SinusoidalEmbedding(torch.nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        # Assuming x is in [0, 1]
        x = 2 * torch.pi * x
        return torch.cat([torch.sin(x), torch.cos(x)], dim=-1)

Creating an MLP using PyTorch

Next, we use the PyTorch library to define our neural network with the specified hyperparameters:

[6]:
class SimpleMLP(torch.nn.Module):
    def __init__(
        self, latent_dim: int, layer_dims: list[int], activation=torch.nn.LeakyReLU
    ):
        super().__init__()
        self.activation = activation
        lin_dims = (
            [2] + layer_dims + [latent_dim]
        )  # The 2 is for the sinusoidal embedding

        layers = []

        for layer_idx in range(len(lin_dims) - 2):
            layers.append(
                torch.nn.Linear(
                    lin_dims[layer_idx], lin_dims[layer_idx + 1], bias=False
                )
            )
            layers.append(activation())

        layers.append(torch.nn.Linear(lin_dims[-2], lin_dims[-1], bias=True))

        self.layers = torch.nn.ModuleList(layers)
        self.sin_embedding = SinusoidalEmbedding()

    def forward(self, x):
        # Sinusoidal embedding
        x = self.sin_embedding(x).float()
        # MLP
        for layer in self.layers:
            x = layer(x)
        return x
[ ]:
from kooplearn.linear_model import Ridge
from kooplearn.torch.utils import FeatureMapEmbedder

device = "cuda" if torch.cuda.is_available() else "cpu"

class FeatureMap(torch.nn.Module):
    def __init__(self, latent_dim: int, normalize_latents: bool = True):
        super().__init__()
        self.normalize_latents = normalize_latents
        self.backbone = SimpleMLP(latent_dim=latent_dim, layer_dims=layer_dims)
        self.lin = torch.nn.Linear(latent_dim, latent_dim, bias=False)

    def forward(self, X, lagged: bool = False):
        z = self.backbone(X)
        if self.normalize_latents:
            z = torch.nn.functional.normalize(z, dim=-1)
        if lagged:
            z = self.lin(z)
        return z


def train_encoder_only(criterion: torch.nn.Module):
    torch.manual_seed(random_state)
    # Initialize model, loss and optimizer
    model = FeatureMap(latent_dim).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    def step(batch, is_train: bool = True):
        batch_X, batch_Y = batch
        batch_X, batch_Y = batch_X.to(device), batch_Y.to(device)
        if is_train:
            optimizer.zero_grad()
        phi_X, phi_Y = model(batch_X), model(batch_Y, lagged=True)
        loss = criterion(phi_X, phi_Y)
        if is_train:
            loss.backward()
            optimizer.step()
        return loss.item()

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = []
        for batch in train_dl:
            train_loss.append(step(batch))
        # Validation phase
        model.eval()
        val_loss = []
        with torch.no_grad():
            for batch in val_dl:
                val_loss.append(step(batch, is_train=False))

        if (epoch + 1) % 5 == 0 or (epoch == 0):
            print(
                f"EPOCH {epoch + 1:>2}  Loss: {np.mean(train_loss):.2f} (train) -  " \
                    f"{np.mean(val_loss):.2f} (val)"
            )

    embedder = FeatureMapEmbedder(encoder=model)
    evolution_operator_model = Ridge(n_components=latent_dim).fit(
        embedder.transform(train_data), train_data.numpy(force=True)
    )

    return {
        "model": evolution_operator_model,
        "feature_map": embedder.transform,
    }
[8]:
from kooplearn.torch.nn import SpectralContrastiveLoss, VampLoss

trained_models = {}
for name, criterion in zip(
    ["VAMPNets", "Spectral Contrastive Loss"],
    [VampLoss(center_covariances=False), SpectralContrastiveLoss()],
):
    print(f"Fitting {name}")
    trained_models[name] = train_encoder_only(criterion)
Fitting VAMPNets
EPOCH  1  Loss: -3.23 (train) -  -3.07 (val)
EPOCH  5  Loss: -3.29 (train) -  -3.11 (val)
EPOCH 10  Loss: -3.30 (train) -  -3.14 (val)
EPOCH 15  Loss: -3.31 (train) -  -3.14 (val)
EPOCH 20  Loss: -3.30 (train) -  -3.13 (val)
EPOCH 25  Loss: -3.20 (train) -  -3.00 (val)
EPOCH 30  Loss: -3.10 (train) -  -3.05 (val)
EPOCH 35  Loss: -3.00 (train) -  -2.80 (val)
EPOCH 40  Loss: -2.95 (train) -  -2.82 (val)
EPOCH 45  Loss: -2.95 (train) -  -2.81 (val)
Warning: The fitting algorithm discarded 3 dimensions of the 8 requested out of numerical instabilities.
The rank attribute has been updated to 5.
Consider decreasing the rank parameter.
EPOCH 50  Loss: -2.94 (train) -  -2.83 (val)
Fitting Spectral Contrastive Loss
EPOCH  1  Loss: -0.73 (train) -  -1.02 (val)
EPOCH  5  Loss: -1.32 (train) -  -1.35 (val)
EPOCH 10  Loss: -1.59 (train) -  -1.61 (val)
EPOCH 15  Loss: -1.82 (train) -  -1.84 (val)
EPOCH 20  Loss: -2.04 (train) -  -2.06 (val)
EPOCH 25  Loss: -2.26 (train) -  -2.28 (val)
EPOCH 30  Loss: -2.45 (train) -  -2.46 (val)
EPOCH 35  Loss: -2.62 (train) -  -2.65 (val)
EPOCH 40  Loss: -2.77 (train) -  -2.79 (val)
EPOCH 45  Loss: -2.88 (train) -  -2.91 (val)
Warning: The fitting algorithm discarded 2 dimensions of the 8 requested out of numerical instabilities.
The rank attribute has been updated to 6.
Consider decreasing the rank parameter.
EPOCH 50  Loss: -2.99 (train) -  -3.00 (val)

Determistic feature map based on Chebyshev Polynomials

[9]:
class ChebyT:
    def __init__(self, feature_dim: int = 8):
        self.feature_dim = feature_dim
    def __call__(self, x):
        vals = [self.scaled_chebyt(n, x) for n in range(self.feature_dim)]
        return np.concatenate(vals, axis=-1)
    def scaled_chebyt(self, n, x):
        return scipy.special.eval_chebyt(n, 2 * x - 1)

cheby_model = Ridge(n_components=latent_dim).fit(
    ChebyT(feature_dim=latent_dim)(dataset["train"].values))
trained_models["ChebyT"] = {
    "model": cheby_model,
    "feature_map": ChebyT(feature_dim=latent_dim),
}

Evaluation Metrics

Now it is time to evaluate the performance of model. To do this, for this specific example we will use two different metrics that are used in the paper: (i) the optimality gap, and (ii) the spectral error.

Optimality Gap

The definition of the optimality gap in the paper is $ \sum{i = 1}^{3} :nbsphinx-math:`sigma`{i}^{2}(\tau) - \mathcal{P}`^{0}(w)$, which informs on how close one is to capture the best rank-r approximation of the transfer operator, :math:mathcal{T}`.

Spectral Error

The spectral error is calculated by $ max_i  min_j \left `\| :nbsphinx-math:lambda`i(:nbsphinx-math:`mathcal{P}`{\mathcal{H}}:nbsphinx-math:tau_{|:nbsphinx-math:mathcal{H}}) - \lambda_j(\tau) \right `\|$. This formula measures how well the true eigenvalues of :math:mathcal{T}` can be recovered within the representation space \(\mathcal{H_w}\).

In order to calculate the above metrics for evaluation the performance of the learned representation we should first calculate populational covariances and cross-covariances for the learned feature map.

The function get_population_covs computes population covariance and cross-covariance matrices relative to a given feature map.

[10]:
from kooplearn.datasets import (
    compute_logistic_map_eig,
    compute_logistic_map_invariant_pdf,
)
from kooplearn.datasets._logistic_map import TrigonometricNoise, logistic_map

invariant_pdf = compute_logistic_map_invariant_pdf(M=M)
transition_pdf = TrigonometricNoise(M=M).pdf


def get_population_covs(feature_map, integration_points=2**12 + 1):
    # Covariance:
    x = np.linspace(0, 1, integration_points)
    dx = x[1] - x[0]

    # Covariance:
    phi_X = feature_map(x[:, None])
    cov_X = np.einsum("xi,xj,x->xij", phi_X, phi_X, invariant_pdf(x))
    cov_X = scipy.integrate.romb(cov_X, dx=dx, axis=0)
    # Cross-Covariance:
    X, Y = np.meshgrid(x, x, indexing="ij")
    cov_XY = np.einsum(
        "xi,yj,x,xy->xyij",
        phi_X,
        phi_X,
        invariant_pdf(x),
        transition_pdf(logistic_map(X) - Y),
    )
    cov_XY = scipy.integrate.romb(cov_XY, dx=dx, axis=0)
    cov_XY = scipy.integrate.romb(cov_XY, dx=dx, axis=0)
    return cov_X, cov_XY
[ ]:
from kooplearn.metrics import directed_hausdorff_distance

ref_eigs = compute_logistic_map_eig(M=M)
for key, model_dict in trained_models.items():
    feature_map = model_dict["feature_map"]
    cov, cross_cov = get_population_covs(feature_map)
    OLS_eigs = scipy.linalg.eigvals(cross_cov, cov)
    empirical_OLS_eigs = model_dict["model"].eig()
    spectral_dist = directed_hausdorff_distance(OLS_eigs, ref_eigs)
    empirical_spectral_dist = directed_hausdorff_distance(empirical_OLS_eigs, ref_eigs)
    print(
        f"{key} - Spectral Dist: {spectral_dist:.4f} - Empirical Spectral Dist: " \
            f"{empirical_spectral_dist:.4f}"
    )
VAMPNets - Spectral Dist: 0.2301 - Empirical Spectral Dist: 0.4664
Spectral Contrastive Loss - Spectral Dist: 0.1801 - Empirical Spectral Dist: 0.3862
ChebyT - Spectral Dist: 0.0789 - Empirical Spectral Dist: 0.2755