Skip to content

🌌 Creating Data Streams

This notebook demonstrates how to create evolving distributions with streamgen using parameter schedules.


📄 Table of Contents

  1. 🌳 Sampling tree
  2. ⚙️ Parameter schedules
  3. 🫂 Putting everything together
import itables
import numpy as np
import pandas as pd
import seaborn as sns
from rich import print

from streamgen import visualizations
from streamgen.parameter.store import ParameterStore
from streamgen.samplers.tree import SamplingTree

SEED = 42
rng = np.random.default_rng(SEED)

itables.init_notebook_mode(all_interactive=False)

🌳 Sampling tree

We will re-use our time-series example from examples/time series classification/01-static-distributions.ipynb:

# ➡️ transforms and generators
def background(signal_, signal_length: int, offset: float, strength: float) -> np.ndarray:  # noqa: D103, ANN001, ARG001
    return rng.normal(offset, strength, signal_length)


def ramp(signal: np.ndarray, height: float, length: int) -> np.ndarray:  # noqa: D103
    ramp_signal = np.zeros(len(signal))
    ramp_start = rng.choice(range(len(signal) - length))
    ramp_signal[ramp_start : ramp_start + length] = np.linspace(0.0, height, length)
    return signal + ramp_signal


def step(signal: np.ndarray, length: int, kernel_size: int) -> np.ndarray:  # noqa: D103
    step_signal = np.zeros(len(signal))
    step_start = rng.choice(range(len(signal) - length))
    step_signal[step_start : step_start + length] = 1.0
    kernel = np.ones(kernel_size) / kernel_size
    step_signal = np.convolve(step_signal, kernel, mode="same")
    return signal + step_signal


# ⚙️ scoped parameter
params = {
    "background": {
        "signal_length": 256,
        "offset": 0.0,
        "strength": 0.1,
    },
    "ramp": {
        "height": 1.0,
        "length": 128,
    },
    "step": {
        "length": 128,
        "kernel_size": 10,
    },
}

# 🎲🌳 tree of transformations
tree = SamplingTree(
    [
        background,
        {
            "background": "background",
            "ramp": [ramp, "ramp"],
            "step": [step, "step"],
        },
    ],
    params,
)

print(tree)
🌳
➡️ `background(offset=0.0, signal_length=256, strength=0.1)`
╰── 🪴 `branching_node()`
    ├── 🏷️ `background`
    ├── ➡️ `ramp(height=1.0, length=128)`
    │   ╰── 🏷️ `ramp`
    ╰── ➡️ `step(kernel_size=10, length=128)`
        ╰── 🏷️ `step`

⚙️ Parameter schedules

The tree we defined currently represents a static distribution, since the parameters that influence how the signals are constructed don't change over time.

If we want to model evolving distributions, we either need to change the parameters or topology of the tree over time. In this notebook, we will focus on scheduling the parameters.

parameter schedule

streamgen has its own Parameter class, which represents a named variable that can change over time as defined by its schedule.

A schedule of a variable is simply an Iterable of values. Additionally, you can define what happens when a schedule is exhausted via a parameters strategy.

In the previous example, we used simple nested dictionaries to set parameters via name-value pairs. Behind the scenes, the SamplingTree class converts this dictionary to a ParameterStore object, which is a container for Parameter objects. When you want to set the schedule and update strategy of your parameters, you need to define a ParameterDict as a value.

Lets create schedules for our example parameters:

params = ParameterStore(
    {
        "background": {
            "signal_length": 256,  # parameters don't need to have a schedule
            "offset": {"schedule": [0.0, 0.2, 0.5]},  # if only a schedule is defined, the first element will be used as the value
            "strength": {
                "schedule": [0.1, 0.4],
                "strategy": "cycle",  # cycle the schedule when it is exhausted. The default is holding the last value
            },
        },
        "ramp": {
            "height": {"schedule": [0.9, 0.8, 0.7]},
            "length": {"schedule": [128, 64, 32]},
        },
        "step": {
            "length": {"value": 128},
            "kernel_size": {"schedule": [1, 10, 20]},
        },
    },
)

print(params)
{
    'background.offset': 0.0,
    'background.signal_length': 256,
    'background.strength': 0.1,
    'ramp.height': 0.9,
    'ramp.length': 128,
    'step.kernel_size': 1,
    'step.length': 128
}
# 🆙 updating parameters
params.update()

print(params)
{
    'background.offset': 0.2,
    'background.signal_length': 256,
    'background.strength': 0.4,
    'ramp.height': 0.8,
    'ramp.length': 64,
    'step.kernel_size': 10,
    'step.length': 128
}

ParameterStores can also be initialized from pandas.DataFrames 🐼:

df = pd.DataFrame(
    {
        "background.signal_length": 256,
        "background.offset": [0.0, 0.2, 0.5],
        "background.strength": [0.1, 0.2, 0.4],
        "ramp.height": [0.9, 0.8, 0.7],
        "ramp.length": [128, 64, 32],
        "step.length": 128,
        "step.kernel_size": [1, 10, 20],
    },
)

itables.show(df, scrollx=True)
background.signal_length background.offset background.strength ramp.height ramp.length step.length step.kernel_size
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)
params = ParameterStore.from_dataframe(df)

print(params)
{
    'background.offset': 0.0,
    'background.signal_length': 256,
    'background.strength': 0.1,
    'ramp.height': 0.9,
    'ramp.length': 128,
    'step.kernel_size': 1,
    'step.length': 128
}

🖼️ Parameters and ParameterStores can also be plotted using functions in streamgen.visualizations

visualizations.plot_parameter_store_widget(params, num_values=3)
Tab(children=(Output(), Output(), Output()), selected_index=0, titles=('step', 'ramp', 'background'))

🥲 Unfortunately, ipywidgets are not rendered in the hosted documentation, so we include a screenshot of the widget for reference:

plot_parameter_store_widget output

🫂 Putting everything together

With those two concepts in place, we can define and sample from dynamic distributions:

%matplotlib notebook
# 🎲🌳 tree of transformations
tree = SamplingTree(
    [
        background,
        {
            "background": "background",
            "ramp": [ramp, "ramp"],
            "step": [step, "step"],
        },
    ],
    params,
)

# 🌌 simulate three experiences and collect 16 samples for each experience
experiences = []
for idx in range(3):
    print(f"Experience #{idx+1}")
    experiences.append(tree.collect(16))
    display(visualizations.plot_labeled_samples_animation(tree, lambda sample, ax: sns.lineplot(sample, ax=ax)))
    tree.update()
Experience #1
Output()


Experience #2
Output()


Experience #3
Output()


When evaluating continual learning systems on streams, it is often required to test on previous experiences. For these scenarios, as an alternative to calling tree.update, users can set the tree to a specific step in its parameter values using tree.set_update_step(step):

# set the tree to update step/experience 1
tree.set_update_step(1)
print(tree.get_params())
samples = tree.collect(100)
{
    'background.offset': 0.2,
    'background.signal_length': 256,
    'background.strength': 0.2,
    'ramp.height': 0.8,
    'ramp.length': 64,
    'step.kernel_size': 10,
    'step.length': 128
}
Output()