Module streamgen.streams
🌌 stream abstractions.
View Source
"""🌌 stream abstractions."""
# ruff: noqa: ERA001
from typing import Any
import numpy as np
from beartype import beartype
from loguru import logger
from streamgen import is_extra_installed
from streamgen.samplers import Sampler
if is_extra_installed("cl"):
import torch
#! avalanche-lib has a broken version constraint on torchcv
# * because I do not want to wait for the fix and since these mehtods
# * only provide application-specific starting points, I decided to exclude them temporarily
# from avalanche.benchmarks.utils import as_classification_dataset
# from avalanche.benchmarks.utils.classification_dataset import ClassificationDataset
from continuum.datasets import InMemoryDataset
from continuum.scenarios.continual_scenario import ContinualScenario
from torch.utils.data import TensorDataset
@beartype
def collect_stream(
sampler: Sampler,
n_experiences: int,
n_samples_per_experience: int = 100,
) -> list[Any]:
"""🌌 collects a stream of `n_experiences` with `n_samples_per_experience` by calling `sampler.collect`.
Args:
sampler (Sampler): data generator
n_experiences (int, optional): Number of experiences. Each experience represents a different parameterization of the sampler.
n_samples_per_experience (int, optional): Number of samples to collect in each experience. Defaults to 100
Returns:
list[Any]: a list of experiences. The format and type of each experience depends on the `collect` implementation of the sampler.
"""
logger.info(f"🌌 collecting {n_experiences} experiences with {n_samples_per_experience} samples each.")
return [sampler.collect(n_samples_per_experience) for _ in range(n_experiences)]
@beartype
def to_tensor_dataset(samples: np.ndarray, targets: np.ndarray) -> TensorDataset:
"""🗃️ constructs a TensorDataset with a `targets` field from two numpy arrays."""
samples = torch.tensor(samples)
targets = torch.tensor(targets)
dataset = TensorDataset(samples, targets)
dataset.targets = targets
return dataset
# @beartype
# def construct_avalanche_classification_datasets(experiences: list[tuple[np.ndarray, np.ndarray]]) -> list[ClassificationDataset]:
# """❄️ constructs a list of avalanche `ClassificationDataset`s.
# This can be used with `avalanche.benchmarks.scenarios.dataset_scenario.benchmark_from_datasets` to create an avalanche benchmark.
# Args:
# experiences (list[tuple[np.ndarray, np.ndarray]]): list of experiences.
# Returns:
# list[ClassificationDataset]: list of avalanche `ClassificationDataset`s.
# """
# stream = []
# for experience in experiences:
# samples, targets = experience
# dataset = to_tensor_dataset(samples, targets)
# dataset = as_classification_dataset(dataset)
# stream.append(dataset)
# return stream
@beartype
def construct_continuum_scenario(experiences: list[tuple[np.ndarray, np.ndarray]]) -> ContinualScenario:
"""🕑 constructs a continuum `ContinualScenario`.
Args:
experiences (list[tuple[np.ndarray, np.ndarray]]): list of experiences.
Returns:
ContinualScenario: generic continuum scenario.
"""
x, y, t = [], [], []
for idx, experience in enumerate(experiences):
samples, targets = experience
task_ids = np.ones_like(targets) * idx
x.append(samples)
y.append(targets)
t.append(task_ids)
x = np.concatenate(x)
y = np.concatenate(y)
t = np.concatenate(t)
dataset = InMemoryDataset(x, y, t)
return ContinualScenario(dataset)
Functions
collect_stream
def collect_stream(
sampler: streamgen.samplers.Sampler,
n_experiences: int,
n_samples_per_experience: int = 100
) -> list[typing.Any]
🌌 collects a stream of n_experiences
with n_samples_per_experience
by calling sampler.collect
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sampler | Sampler | data generator | None |
n_experiences | int | Number of experiences. Each experience represents a different parameterization of the sampler. | None |
n_samples_per_experience | int | Number of samples to collect in each experience. Defaults to 100 | None |
Returns:
Type | Description |
---|---|
list[Any] | a list of experiences. The format and type of each experience depends on the collect implementation of the sampler. |
View Source
@beartype
def collect_stream(
sampler: Sampler,
n_experiences: int,
n_samples_per_experience: int = 100,
) -> list[Any]:
"""🌌 collects a stream of `n_experiences` with `n_samples_per_experience` by calling `sampler.collect`.
Args:
sampler (Sampler): data generator
n_experiences (int, optional): Number of experiences. Each experience represents a different parameterization of the sampler.
n_samples_per_experience (int, optional): Number of samples to collect in each experience. Defaults to 100
Returns:
list[Any]: a list of experiences. The format and type of each experience depends on the `collect` implementation of the sampler.
"""
logger.info(f"🌌 collecting {n_experiences} experiences with {n_samples_per_experience} samples each.")
return [sampler.collect(n_samples_per_experience) for _ in range(n_experiences)]
construct_continuum_scenario
def construct_continuum_scenario(
experiences: list[tuple[numpy.ndarray, numpy.ndarray]]
) -> continuum.scenarios.continual_scenario.ContinualScenario
🕑 constructs a continuum ContinualScenario
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experiences | list[tuple[np.ndarray, np.ndarray]] | list of experiences. | None |
Returns:
Type | Description |
---|---|
ContinualScenario | generic continuum scenario. |
View Source
@beartype
def construct_continuum_scenario(experiences: list[tuple[np.ndarray, np.ndarray]]) -> ContinualScenario:
"""🕑 constructs a continuum `ContinualScenario`.
Args:
experiences (list[tuple[np.ndarray, np.ndarray]]): list of experiences.
Returns:
ContinualScenario: generic continuum scenario.
"""
x, y, t = [], [], []
for idx, experience in enumerate(experiences):
samples, targets = experience
task_ids = np.ones_like(targets) * idx
x.append(samples)
y.append(targets)
t.append(task_ids)
x = np.concatenate(x)
y = np.concatenate(y)
t = np.concatenate(t)
dataset = InMemoryDataset(x, y, t)
return ContinualScenario(dataset)
to_tensor_dataset
def to_tensor_dataset(
samples: numpy.ndarray,
targets: numpy.ndarray
) -> torch.utils.data.dataset.TensorDataset
🗃️ constructs a TensorDataset with a targets
field from two numpy arrays.
View Source
@beartype
def to_tensor_dataset(samples: np.ndarray, targets: np.ndarray) -> TensorDataset:
"""🗃️ constructs a TensorDataset with a `targets` field from two numpy arrays."""
samples = torch.tensor(samples)
targets = torch.tensor(targets)
dataset = TensorDataset(samples, targets)
dataset.targets = targets
return dataset