Data Processing Pipeline#

Source Files
  • twiga/core/data/pipeline.py - DataPipeline orchestrator

  • twiga/core/data/temporal.py - TemporalFeatureTransformer

  • twiga/core/data/autores.py - AutoregressTransformer

  • twiga/core/data/feature.py - Fourier series, day/night, net-load helpers

  • twiga/core/data/selection.py - Feature selection utilities

  • twiga/core/data/processing.py - Sequence creation, lag/rolling augmentation

  • twiga/core/data/loader.py - TimeseriesDataModule (PyTorch Lightning)

The Data Pipeline is responsible for transforming a raw pandas DataFrame into model-ready NumPy arrays. It handles temporal feature extraction, autoregressive feature engineering, column-wise scaling, and sliding-window sequence creation - all behind a single scikit-learn-compatible transformer.

Pipeline Overview#

The following diagram shows the end-to-end flow from raw data to the arrays consumed by ML and NN models.

        flowchart LR
    A["Raw DataFrame\n(timestamp + features)"] --> B["TemporalFeatureTransformer\n(calendar & Fourier features)"]
    B --> C["AutoregressTransformer\n(lags & rolling windows)"]
    C --> D["ColumnTransformer\n(target + input scaling)"]
    D --> E["Sequence Creation\n(sliding window)"]
    E --> F["Model-ready arrays\n(features, targets)"]
    
  1. TemporalFeatureTransformer - Extracts calendar features (hour, dayofweek, month, …), adds Fourier-encoded cyclical features, and optionally adds a binary day_night indicator based on geographic coordinates.

  2. AutoregressTransformer - Creates lagged copies of the target variable and computes rolling-window statistics (mean, std, etc.).

  3. ColumnTransformer (Scaling) - Applies separate scalers to target columns and numerical input columns via scikit-learn’s ColumnTransformer.

  4. Sequence Creation - Uses NumPy stride tricks to build overlapping (lookback_window, num_features) input windows and (forecast_horizon, num_targets) target windows.

The DataPipeline class wires these stages together into a single sklearn.pipeline.Pipeline and exposes a familiar fit / transform / fit_transform API.

DataPipeline#

DataPipeline extends TransformerMixin and BaseEstimator, so it slots directly into scikit-learn workflows.

Constructor Parameters#

Parameter

Type

Default

Description

target_feature

list[str] | str

required

Column name(s) of the variable(s) to forecast.

period

str

required

Sampling frequency as a pandas offset alias (e.g. "30min", "1h").

lookback_window_size

int

required

Number of past time steps used as input features.

forecast_horizon

int

required

Number of future time steps to predict.

latitude

float | None

None

Latitude for day/night feature calculation (required when "day_night" is in calendar_features).

longitude

float | None

None

Longitude for day/night feature calculation.

historical_features

list[str] | None

None

Columns with unknown future values (past-only context).

calendar_features

list[str] | None

None

Temporal features to extract (see table below).

exogenous_features

list[str] | None

None

External columns available for both past and future windows.

future_covariates

list[str] | None

None

External columns available only for the future horizon window.

input_scaler

object | None

None

Scaler applied to numerical input features. Defaults to FunctionTransformer() (identity) when None.

target_scaler

object | None

None

Scaler applied to target column(s). Defaults to FunctionTransformer() (identity) when None.

lags

list[int] | None

None

Lag intervals for autoregressive features (in periods).

windows

list[int] | int | None

None

Rolling-window sizes for statistics (in periods).

window_funcs

list[str] | str | None

None

Aggregation functions for rolling windows (e.g. "mean", "std").

date_column

str

"timestamp"

Name of the datetime column.

stride

int

1

Step between consecutive sliding windows. Default 1 produces fully overlapping windows. Set to forecast_horizon for non-overlapping windows.

Configuration shortcut

In practice you rarely construct DataPipeline directly. Instead, create a DataPipelineConfig and pass it to TwigaForecaster, which builds the pipeline for you.

Methods#

Method

Signature

Description

fit

fit(data: pd.DataFrame, y=None) -> DataPipeline

Builds the internal sklearn.pipeline.Pipeline (feature extraction + scaling) and fits it on the provided DataFrame.

transform

transform(data: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]

Applies the fitted pipeline and returns (features, targets) as 3-D NumPy arrays.

fit_transform

fit_transform(X: pd.DataFrame, y=None) -> tuple[np.ndarray, np.ndarray]

Convenience method: calls fit then transform.

transform_features

transform_features(data: pd.DataFrame) -> np.ndarray

Returns only the feature array (past features stacked with future covariates).

transform_targets

transform_targets(data: pd.DataFrame) -> np.ndarray

Returns only the target array.

get_ground_truth_sequences

get_ground_truth_sequences(data: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]

Returns (timestamps, targets) aligned with the feature extraction window for evaluation.

prepare_data

prepare_data(data: pd.DataFrame) -> None

Runs the internal pipeline and stores the transformed DataFrame on self.data. Raises NotFittedError if fit() has not been called.

Feature Engineering Stages#

TemporalFeatureTransformer#

Adds calendar and cyclical features derived from the datetime column.

from twiga.core.data.temporal import TemporalFeatureTransformer

transformer = TemporalFeatureTransformer(
    calendar_features=["hour", "dayofweek", "month", "day_night"],
    latitude=-6.8,
    longitude=39.3,
    date_column="timestamp",
)

df_transformed = transformer.fit_transform(df)

Supported Calendar Features#

The transformer delegates to augment_timeseries_signature which extracts 29 datetime-based features from the date column. You select which ones to keep via calendar_features. The most commonly used are:

Feature

Type

Description

hour

Trigonometric

Hour of day (0–23). Fourier-encoded as hour_sin, hour_cos, hour_cosin.

dayofweek / wday

Trigonometric

Day of week (1=Monday … 7=Sunday). Fourier-encoded.

month

Trigonometric

Month number (1–12). Fourier-encoded.

quarter

Trigonometric

Quarter (1–4). Fourier-encoded.

day_night

Binary

1 during daylight, 0 at night. Computed from sunrise/sunset using the astral library. Requires latitude and longitude.

weekend

Binary

1 for Saturday/Sunday, 0 otherwise.

am_pm

Categorical

"am" or "pm".

yday

Integer

Day of the year (1–366).

mday

Integer

Day of the month (1–31).

yweek

Integer

ISO week of the year.

mweek

Integer

Week of the month (1–5).

Fourier encoding

Trigonometric features are automatically Fourier-encoded using get_fourier_series(). For each feature, three columns are added: <feature>_sin, <feature>_cos, and <feature>_cosin (the sum of sine and cosine). This captures cyclical patterns without introducing artificial discontinuities (e.g. hour 23 is close to hour 0).

AutoregressTransformer#

Adds lagged values and rolling-window statistics of the target variable.

from twiga.core.data.autores import AutoregressTransformer

transformer = AutoregressTransformer(
    n_samples=48,              # 48 samples/day for 30-min data
    lags=[1, 24, 48, 168],    # lag by 1, 24, 48, 168 periods
    windows=[24, 48],          # rolling windows of 24 and 48 periods
    window_funcs=["mean", "std"],
    date_column="timestamp",
    value_column="load_mw",
)

df_transformed = transformer.fit_transform(df)

Lag Features#

Lags are specified in natural units (e.g. hours for hourly data) and are internally multiplied by n_samples to convert to row indices. For example, with period="30min" (n_samples=48 per day) and lags=[1, 24]:

Specified Lag

Scaled Lag

Column Created

1

48

load_mw_lag_48

24

1152

load_mw_lag_1152

Note

Rows at the beginning of the DataFrame that cannot be filled (due to the lag offset) are dropped. The number of dropped rows is tracked in max_data_drop.

Rolling-Window Statistics#

Windows are also specified in natural units and scaled by n_samples. Each combination of window size and aggregation function produces a new column:

Window

Function

Column Created

24

mean

load_mw_rolling_mean_win_1152

24

std

load_mw_rolling_std_win_1152

48

mean

load_mw_rolling_mean_win_2304

48

std

load_mw_rolling_std_win_2304

Supported rolling functions include any method available on a pandas Rolling object: mean, std, sum, min, max, median, var, quantile, skew, kurt, and custom callables passed as (name, func) tuples.

Scaling#

The pipeline uses scikit-learn’s ColumnTransformer to apply separate scalers to different column groups:

Column Group

Scaler Parameter

Default

Target column(s)

target_scaler

FunctionTransformer() (identity)

Numerical inputs (historical + exogenous)

input_scaler

FunctionTransformer() (identity)

Everything else (calendar, lags, rolling)

remainder="passthrough"

No transformation

Common scaler choices:

from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler

# Z-score normalization
input_scaler = StandardScaler()

# Robust to outliers
target_scaler = RobustScaler()

# Scale to [0, 1]
input_scaler = MinMaxScaler()

Warning

The pipeline calls fit on the scaler only during DataPipeline.fit(). At inference time (transform), the scaler uses the statistics learned during training. Always fit on training data only to prevent data leakage.

Sequence Creation#

After feature engineering and scaling, the pipeline converts the flat DataFrame into 3-D arrays of overlapping windows using NumPy stride tricks.

How It Works#

Given a time series of length N, a lookback_window_size of L, and a forecast_horizon of H:

Time index:  0   1   2   ...   L-1   L   L+1   ...   L+H-1   L+H  ...  N-1

Window 0:    [--- features (L) ---]  [--- targets (H) ---]
Window 1:      [--- features (L) ---]  [--- targets (H) ---]
Window 2:        [--- features (L) ---]  [--- targets (H) ---]
  ...
Window K:                                [--- features (L) ---]  [--- targets (H) ---]
  • Feature sequences: shape (K, L, num_features) - each sample is a lookback window of all input features.

  • Target sequences: shape (K, H, num_targets) - each sample is the corresponding future window of target values.

  • K = N - L - H + 1 (number of valid windows with stride 1).

When daily_features=True, the stride equals forecast_horizon instead of 1, producing non-overlapping day-aligned windows.

Feature Stacking#

When future covariates are present, the pipeline combines past features and future covariates into a single array using stack_features():

Past features:   (K, L, num_past_features)
Future covariates: (K, H, num_covariate_features)

Combined:        (K, L + H, max(num_past_features, num_covariate_features))

The smaller feature dimension is zero-padded to match the larger one. Neural network models can then split the combined array back into past and future segments using unstack_features().

Feature Selection#

The selection module provides utilities to rank and select the most relevant features before training. For a deeper exploration of association methods and the full statistical toolkit, see Feature Analysis & Statistical Toolkit.

AssociationAnalyzer#

AssociationAnalyzer computes feature-target association scores using eight different methods in a single, consistent interface:

Method

Key

Task

Pearson correlation

"pearson"

Regression

Spearman rank correlation

"spearman"

Both

Kendall tau

"kendall"

Both

Xi-correlation (Chatterjee)

"xicor"

Both

Predictive Power Score

"pps"

Both

Mutual Information

"mi"

Both

ANOVA F-score

"anova"

Regression

Chi-squared

"chi2"

Classification

from twiga.core.data import AssociationAnalyzer

analyzer = AssociationAnalyzer(
    data=df,
    features=["temperature", "ghi", "hour_sin", "hour_cos"],
    target="load_mw",
    task="regression",
)

# Run a single method
scores_pearson = analyzer.compute(method="pearson")

# Run all methods and compare
all_scores = analyzer.compute_all()

# Visualise as a heatmap
fig = analyzer.plot_heatmap(method="spearman")

select_top_features#

A comprehensive ensemble feature-ranking function that combines six signals:

  • Spearman rank correlation (regression) or ANOVA F-score (classification)

  • Mutual information (non-linear dependency)

  • Xi-correlation (non-parametric, detects arbitrary functional relationships)

  • Predictive Power Score (asymmetric, model-based)

  • Random Forest MDI importance (compute_rf_importance)

  • Chi-squared statistics (classification only, optional)

Rankings from each method are merged via Borda count rank aggregation (default) or alternative strategies (geom_rank, arith_rank, med_rank, sum_rank), producing a stable, ensemble-ranked feature list that is robust to the idiosyncrasies of any single metric.

from twiga.core.data.selection import select_top_features

top = select_top_features(
    data=df,
    features=feature_columns,
    target="load_mw",
    task="regression",
    top_k=10,
    alpha=0.05,          # optional p-value filter
    return_scores=True,  # include per-method scores in output
)

compute_rf_importance#

Returns Random Forest feature importances (mean decrease in impurity) as a tidy DataFrame:

from twiga.core.data.selection import compute_rf_importance

importance_df = compute_rf_importance(
    data=df,
    features=feature_columns,
    target="load_mw",
    task="regression",
)
# Returns a long-format DataFrame with columns: feature, importance

TimeseriesDataModule#

For neural network models, the pipeline’s NumPy arrays are fed into a PyTorch Lightning DataModule that handles batching, shuffling, and GPU transfer.

from twiga.core.data.loader import TimeseriesDataModule

dm = TimeseriesDataModule(
    train_inputs=train_features,    # np.ndarray
    train_targets=train_targets,    # np.ndarray
    val_inputs=val_features,        # np.ndarray (optional)
    val_targets=val_targets,        # np.ndarray (optional)
    batch_size=64,
    num_workers=4,
    pin_memory=True,
)

Parameter

Type

Default

Description

train_inputs

np.ndarray

required

Feature array for training.

train_targets

np.ndarray

required

Target array for training.

val_inputs

np.ndarray | None

None

Feature array for validation.

val_targets

np.ndarray | None

None

Target array for validation.

batch_size

int

64

Samples per batch.

num_workers

int

1

Parallel data-loading workers.

persistent_workers

bool

True

Keep worker processes alive between epochs (requires num_workers > 0).

pin_memory

bool

True

Pin tensors in page-locked memory for faster GPU transfer.

Note

TimeseriesDataModule is used internally by TwigaForecaster when training neural network models. You typically do not need to instantiate it directly.

Complete Example#

import pandas as pd
from sklearn.preprocessing import StandardScaler, RobustScaler

from twiga.core.data.pipeline import DataPipeline

# 1. Load data
df = pd.read_parquet("data/load_timeseries.parquet")
# Expected columns: timestamp, load_mw, temperature, ghi

# 2. Create the pipeline
pipeline = DataPipeline(
    target_feature="load_mw",
    period="30min",
    lookback_window_size=96,       # 2 days of 30-min data
    forecast_horizon=48,           # predict 1 day ahead
    latitude=-6.8,
    longitude=39.3,
    historical_features=["temperature"],
    calendar_features=["hour", "dayofweek", "day_night"],
    exogenous_features=["ghi"],
    input_scaler=StandardScaler(),
    target_scaler=RobustScaler(),
    lags=[1, 24, 48],
    windows=[24],
    window_funcs=["mean", "std"],
)

# 3. Fit on training data
train_df = df[df.timestamp <= "2024-06-01"]
pipeline.fit(train_df)

# 4. Transform training and test data
train_features, train_targets = pipeline.transform(train_df)
print(f"Train features: {train_features.shape}")
# e.g. (num_samples, 96, num_features)
print(f"Train targets:  {train_targets.shape}")
# e.g. (num_samples, 48, 1)

test_df = df[df.timestamp > "2024-06-01"]
test_features, test_targets = pipeline.transform(test_df)

# 5. Get aligned ground truth for evaluation
timestamps, ground_truth = pipeline.get_ground_truth_sequences(test_df)

Typical workflow

The example above shows the low-level API. In most cases you will use DataPipelineConfig together with TwigaForecaster, which handles pipeline construction, fitting, and transformation automatically:

```python
from twiga.core.config import DataPipelineConfig

data_config = DataPipelineConfig(
    target_feature="load_mw",
    period="30min",
    lookback_window_size=96,
    forecast_horizon=48,
    calendar_features=["hour", "dayofweek", "day_night"],
    exogenous_features=["ghi"],
    latitude=-6.8,
    longitude=39.3,
    input_scaler=StandardScaler(),
    target_scaler=RobustScaler(),
)
```

See the [Quick Start Guide](../getting-started/quickstart.md) for the full workflow.

API Reference#

Pipeline & transformers#

class twiga.core.data.pipeline.DataPipeline(target_feature, period, lookback_window_size, forecast_horizon, latitude=None, longitude=None, historical_features=None, calendar_features=None, exogenous_features=None, future_covariates=None, input_scaler=None, target_scaler=None, lags=None, windows=None, window_funcs=None, date_column='timestamp', stride=1)#

Bases: TransformerMixin, BaseEstimator

A transformer for preparing time series forecasting datasets.

Handles feature engineering, scaling, and temporal structure creation for forecasting models. Supports lagged features, rolling statistics, temporal features (e.g., hour, day/night), and Fourier transformations for cyclical patterns.

Variables:
  • target_feature (list[str]) – Names of target variable(s) to forecast.

  • historical_features (list[str]) – Historical features with unknown future values.

  • calendar_features (list[str]) – Cyclical temporal features (e.g., hour, day_of_week).

  • exogenous_features (list[str]) – Known future features available over the full horizon.

  • future_covariates (list[str]) – Known future features available over forecast horizon only.

  • input_scaler (Transformer) – Feature scaler for input features (default: FunctionTransformer).

  • target_scaler (Transformer) – Feature scaler for target(s) (default: FunctionTransformer).

  • lags (list[int]) – Lag intervals for feature engineering.

  • windows (list[int]) – Window sizes for rolling statistics.

  • window_funcs (list[str]) – Functions for rolling window calculations (e.g., ‘mean’).

  • period (str) – Time series split frequency (e.g., ‘30min’, ‘1H’).

  • lookback_window_size (int) – Number of past observations per sample.

  • forecast_horizon (int) – Number of future steps to predict.

  • latitude (float) – Latitude for day/night feature calculation.

  • longitude (float) – Longitude for day/night feature calculation.

  • date_column (str) – Name of datetime column (default: ‘timestamp’).

  • n_samples (int) – Samples per day calculated from period.

  • max_data_drop (int) – Maximum data loss from feature engineering steps.

  • exog_periods (list[int]) – Cycle lengths for calendar features.

  • data_pipeline (Pipeline) – Built preprocessing pipeline.

  • data (pd.DataFrame) – Transformed data stored for extraction methods.

__init__(target_feature, period, lookback_window_size, forecast_horizon, latitude=None, longitude=None, historical_features=None, calendar_features=None, exogenous_features=None, future_covariates=None, input_scaler=None, target_scaler=None, lags=None, windows=None, window_funcs=None, date_column='timestamp', stride=1)#

Initializes the DataPipeline transformer with data preparation parameters.

fit(data, y=None)#

Fits the data pipeline to the provided time series data.

fit_transform(X, y=None)#

Fits the transformer to the data and transforms it.

Return type:

tuple[ndarray, ndarray]

get_ground_truth_sequences(data)#

Extract ground truth sequences aligned with the feature extraction window.

This is the pure data extraction method: it applies the same preparation steps as transform_features() so that the returned timestamps and targets have the same first dimension as the feature arrays, but it does not load or modify any model checkpoint.

Prefer this method when checkpoint side effects are undesirable - for example inside forecast(). For evaluation workflows that also need the checkpoint restored, use get_ground_truth() instead.

Parameters:

data (DataFrame) – DataFrame containing the raw input data including the date column and target feature(s).

Return type:

tuple[ndarray, ndarray]

Returns:

Tuple of

  • timestamps: Integer (nanosecond) timestamp array aligned with the prediction sequences.

  • targets: Scaled target sequences of shape (num_sequences, forecast_horizon, num_targets).

prepare_data(data)#

Prepares the data by applying the pipeline and sorting.

Return type:

None

set_feature_covariate_column()#

Sets feature and covariate column lists.

set_fit_request(*, data='$UNCHANGED$')#

Configure whether metadata should be requested to be passed to the fit method.

Note that this method is only relevant when this estimator is used as a sub-estimator within a meta-estimator and metadata routing is enabled with enable_metadata_routing=True (see sklearn.set_config()). Please check the User Guide on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to fit if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to fit.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version 1.3.

Parameters#

datastr, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED

Metadata routing for data parameter in fit.

Returns#

selfobject

The updated object.

set_transform_request(*, data='$UNCHANGED$')#

Configure whether metadata should be requested to be passed to the transform method.

Note that this method is only relevant when this estimator is used as a sub-estimator within a meta-estimator and metadata routing is enabled with enable_metadata_routing=True (see sklearn.set_config()). Please check the User Guide on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to transform if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to transform.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version 1.3.

Parameters#

datastr, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED

Metadata routing for data parameter in transform.

Returns#

selfobject

The updated object.

transform(data)#

Transforms the data into features and targets for forecasting.

Return type:

tuple[ndarray, ndarray]

transform_features(data)#

Transforms the data into features for forecasting or prediction.

Return type:

ndarray

transform_targets(data)#

Transforms the data into targets for forecasting.

Return type:

ndarray

class twiga.core.data.temporal.TemporalFeatureTransformer(calendar_features=None, latitude=None, longitude=None, date_column='timestamp')#

Bases: TransformerMixin, BaseEstimator

A transformer for adding temporal and calendar features to time series data.

Adds temporal features (e.g., hour, day), day/night indicators, and Fourier transformations for cyclical calendar features.

Parameters:
  • calendar_features (list[str] | str | None) – Temporal features to extract (e.g., ‘hour’, ‘day_of_week’). Converted to list if str. Default is None.

  • latitude (float | None) – Latitude for day/night calculation. Required if ‘day_night’ is included. Default is None.

  • longitude (float | None) – Longitude for day/night calculation. Required if ‘day_night’ is included. Default is None.

  • date_column (str) – Name of the datetime column in the input data. Default is ‘timestamp’.

Variables:
  • calendar_features (list[str]) – Normalized list of calendar features.

  • calendar_trig (list[str]) – Trigonometric calendar features (e.g., ‘hour’).

  • calendar_non_trig (list[str]) – Non-trigonometric calendar features (e.g., ‘day_night’).

  • latitude (float | None) – Latitude for day/night feature.

  • longitude (float | None) – Longitude for day/night feature.

  • date_column (str) – Datetime column name.

  • exog_periods (list[int] | None) – Periods for Fourier features, set during fit.

Raises:

ValueError – If parameters are invalid or required columns/features are missing.

fit(X, y=None)#

Fit the transformer by determining periods for Fourier features.

Parameters:
  • X (DataFrame) – Input data with date_column and calendar_features.

  • y (None) – Ignored, for sklearn compatibility.

Returns:

self – Fitted transformer.

Raises:

ValueError – If required columns are missing or insufficient unique values for Fourier features.

fit_transform(X, y=None)#

Fits the transformer to the data and transforms it.

Return type:

DataFrame

transform(X)#

Transform the input data by adding temporal and calendar features.

Parameters:

X (DataFrame) – Input data with date_column and optionally calendar_features.

Return type:

DataFrame

Returns:

pd.DataFrame – Transformed data with added features.

Raises:

ValueError – If X is invalid or transformation fails.

class twiga.core.data.autores.AutoregressTransformer(n_samples=1, lags=None, windows=None, window_funcs=None, date_column='timestamp', value_column='value')#

Bases: TransformerMixin, BaseEstimator

A transformer for adding autoregressive features to time series data.

Applies lagged features and optionally rolling statistics to a specified value column using the pytimetk library.

Parameters:
  • n_samples (int) – Number of samples per period (e.g., 24 for hourly data in a day). Must be positive. Default is 1.

  • lags (list[int] | int | None) – Lag intervals to create. Converted to list if int. Must be positive integers. Default is None.

  • windows (list[int] | int | None) – Window sizes for rolling statistics. Converted to list if int. Must be positive integers. Default is None.

  • window_funcs (list[str] | str | None) – Functions for rolling stats (e.g., ‘mean’, ‘sum’). Converted to list if str. Default is None.

  • date_column (str) – Name of the datetime column in the input data. Default is ‘timestamp’.

  • value_column (str) – Name of the column to apply transformations to. Default is ‘value’.

Variables:
  • lags (list[int]) – Normalized list of lag intervals.

  • windows (list[int]) – Normalized list of window sizes.

  • window_funcs (list[str]) – Normalized list of window functions.

  • n_samples (int) – Number of samples per period.

  • max_data_drop (int) – Maximum number of rows dropped due to lagging or rolling.

Raises:

ValueError – If n_samples, lags, or windows are not positive, or if parameters are invalid.

fit(X, y=None)#

Fit the transformer. Validates input data and returns self.

Parameters:
  • X (DataFrame) – Input data with date_column and value_column.

  • y (None) – Ignored, for sklearn compatibility.

Returns:

self – Fitted transformer.

Raises:

ValueError – If required columns are missing from X.

fit_transform(X, y=None)#

Fits the transformer to the data and transforms it.

Return type:

DataFrame

get_generated_column_names()#

Return the column names that transform() will add, without needing data.

The names are derived deterministically from the constructor parameters and the n_samples period multiplier, so they are available as soon as the transformer is instantiated.

Return type:

list[str]

Returns:

List of new column names in the order they are added – lags first (ascending), then rolling features (ascending window × func order).

transform(X)#

Transform the input data by adding lagged and rolling features.

Parameters:

X (DataFrame) – Input data with date_column and value_column.

Return type:

DataFrame

Returns:

pd.DataFrame – Transformed data with added features.

Raises:

ValueError – If X is invalid or transformation fails.

class twiga.core.data.diff.TimeSeriesDifferentiator(order=1)#

Bases: BaseEstimator, TransformerMixin

Transforms time series data using N-th order differentiation.

This transformer calculates the difference between consecutive points to remove trends. It stores the necessary boundary values to allow for perfect reconstruction (inverse transformation) of the original scale.

Variables:
  • order (int) – The number of times differentiation is applied.

  • history (dict) – Internal storage for boundary values (initial and last) needed to reverse the transformation.

__init__(order=1)#

Initializes the transformer.

Parameters:

order (int) – The order of differentiation. Must be >= 1.

fit(X, y=None)#

Stores anchor values needed to reverse differentiation.

Parameters:
  • X (ndarray) – 1D NumPy array representing the time series.

  • y (Any) – Ignored. Included for Scikit-Learn API compatibility.

Return type:

Self

Returns:

Self – The fitted transformer instance.

forecast_inverse(predictions)#

Reverts differentiation for future predictions.

Uses the ‘last_values’ from the training set to project predictions back into the original data scale.

Parameters:

predictions (ndarray) – Differentiated predicted values.

Return type:

ndarray

Returns:

np.ndarray – Predictions scaled to the original series.

inverse_transform(X)#

Reverts differentiation using stored initial values.

Parameters:

X (ndarray) – Differentiated 1D NumPy array (can contain leading NaNs).

Return type:

ndarray

Returns:

np.ndarray – The reconstructed original time series.

transform(X)#

Applies N-th order differentiation.

Parameters:

X (ndarray) – 1D NumPy array to transform.

Return type:

ndarray

Returns:

np.ndarray

Differentiated series, padded with NaNs to maintain

the original input length.

class twiga.core.data.loader.TimeseriesDataModule(train_inputs, train_targets, val_inputs=None, val_targets=None, batch_size=64, num_workers=1, persistent_workers=True, pin_memory=True)#

Bases: LightningDataModule

A PyTorch Lightning DataModule for managing time series data loading and processing.

Handles dataset creation, automatic tensor conversion, and provides configurable data loaders for training and validation. Designed for seamless integration with PyTorch Lightning workflows.

Parameters:
  • train_inputs (ndarray) – Input features for training data. Shape: (num_samples, num_features)

  • train_targets (ndarray) – Target values for training data. Shape: (num_samples, …)

  • val_inputs (ndarray | None) – Input features for validation data. Defaults to None.

  • val_targets (ndarray | None) – Target values for validation data. Defaults to None.

  • batch_size (int) – Number of samples per batch. Defaults to 64.

  • num_workers (int) – Parallel workers for data loading. Defaults to 1.

  • persistent_workers (bool) – Maintain worker processes between epochs. Requires num_workers > 0. Defaults to True.

  • pin_memory (bool) – Enable fast GPU transfer for CUDA devices. Defaults to True.

Raises:
  • ValueError – If validation inputs/targets are partially provided

  • TypeError – If input arrays are not numpy ndarrays

Variables:
  • train_dataset (TensorDataset) – Training dataset containing (inputs, targets) tensors

  • val_dataset (TensorDataset, optional) – Validation dataset if validation data provided

Example

>>> import numpy as np
>>> train_inputs = np.random.rand(100, 10).astype(np.float32)
>>> train_targets = np.random.rand(100, 1).astype(np.float32)
>>> dm = TimeseriesDataModule(train_inputs, train_targets, batch_size=32, num_workers=4)
>>> trainer.fit(model, dm)
__init__(train_inputs, train_targets, val_inputs=None, val_targets=None, batch_size=64, num_workers=1, persistent_workers=True, pin_memory=True)#

Initialize data module with training/validation data and loading parameters.

setup(stage=None)#

Create tensor datasets from numpy arrays.

Automatically called by Lightning during trainer.fit(). Converts numpy arrays to PyTorch tensors and creates TensorDataset objects for training/validation.

Parameters:

stage (str | None) – Current pipeline stage (‘fit’, ‘validate’, etc). Defaults to None.

Return type:

None

train_dataloader()#

Generate training data loader with configured batching and shuffling.

Return type:

DataLoader

Returns:

DataLoader

Configured loader for training data with:
  • Random shuffling between epochs

  • Specified batch size

  • Parallel workers if num_workers > 0

  • Pinned memory for GPU acceleration

val_dataloader()#

Generate validation data loader if validation data exists.

Return type:

list[DataLoader] | list

Returns:

DataLoader | None

Configured loader for validation data with:
  • Sequential ordering

  • Same batch size as training

  • Parallel workers if num_workers > 0

  • Pinned memory for GPU acceleration

Feature engineering helpers#

twiga.core.data.feature.get_fourier_series(dates, period, series_order=1)#

Compute Fourier series components for seasonality modeling, identical to Prophet’s implementation.

Parameters:
  • dates (Series | ndarray | list) – Timestamps or time values. If not datetime-like, assumed to be days since epoch (1970-01-01).

  • period (float) – Period of the seasonality in days (e.g., 365.25 for yearly, 7 for weekly).

  • series_order (int) – Number of Fourier components (sine and cosine pairs).

Return type:

ndarray

Returns:

np.ndarray

Matrix of shape (len(dates), 2 * series_order) where each column is a

sine or cosine term: [sin(2πt/period), cos(2πt/period), sin(4πt/period), …].

Raises:

ValueError – If period <= 0, series_order < 0, or dates is empty/invalid.

twiga.core.data.feature.add_fourier_features(data, calendar_variables, periods)#

Add Fourier series features for trigonometric calendar features.

Parameters:
  • data (DataFrame) – Input data.

  • date_col_name (str) – Name of the datetime column (unused here, kept for consistency).

  • periods (list[int]) – Periods for each trigonometric feature.

  • calendar_variables (list[str]) – Names of the calendar variables.

Return type:

DataFrame

Returns:

pd.DataFrame – Data with added sin/cos columns.

twiga.core.data.feature.add_day_night_feature(data, latitude, longitude, date_col_name='timestamp')#

Add a day/night feature to the dataset based on sunrise/sunset times.

Parameters:
  • data (DataFrame) – Input dataset with a datetime column.

  • latitude (float) – Latitude of the location.

  • longitude (float) – Longitude of the location.

  • date_col_name (str) – Name of the datetime column. Defaults to “timestamp”.

Return type:

DataFrame

Returns:

Dataset with a “day_night” column (1 for day, 0 for night).

Raises:
  • ValueError – If date_col_name is not datetime-like.

  • KeyError – If date_col_name is not in the DataFrame.

twiga.core.data.feature.get_sunrise_sunset(start_date, end_date, latitude=32.738274, longitude=-16.738519)#

Get the sunrise and sunset times for a given location and period of time.

Parameters:
  • start_date (Timestamp) – Start date (e.g., “YYYY-MM-DD” or datetime.date object).

  • end_date (Timestamp) – End date (e.g., “YYYY-MM-DD” or datetime.date object).

  • latitude (float) – Latitude of the location. Defaults to 32.738274.

  • longitude (float) – Longitude of the location. Defaults to -16.738519.

Return type:

dict[str, dict[str, Timestamp]]

Returns:

Dictionary with dates as keys and dicts of sunrise/sunset times as values.

twiga.core.data.feature.compute_netload_ghi(load, ghi, num_samples_per_day)#

Compute the net load by subtracting normalized GHI from normalized load.

Parameters:
  • load (np.ndarray) – Array of load values.

  • ghi (np.ndarray) – Array of GHI (Global Horizontal Irradiance) values.

  • num_samples_per_day (int) – Number of samples per day.

Returns:

np.ndarray – Net load (normalized load - normalized GHI) for valid segments.

Processing utilities#

twiga.core.data.processing.normalise_timestamp_column(df, col)#

Normalise a timestamp column to tz-naive datetime64[ns].

This is the single canonical entry point for datetime normalisation across the library. It accepts any of the three forms a timestamp column can arrive in and always returns the same shape so that all downstream transformers can assume tz-naive data:

  • tz-naive datetime64[ns] - returned unchanged (fast path).

  • tz-aware DatetimeTZDtype (e.g. UTC from pd.to_datetime(..., utc=True)) - converted to UTC wall-clock time then timezone info is dropped.

  • object / string - parsed with utc=True so that any embedded timezone offset is honoured, then timezone info is dropped.

Parameters:
  • df (DataFrame) – Input DataFrame. A copy is returned only when the column is modified; the original is returned unchanged for the fast path.

  • col (str) – Name of the timestamp column.

Return type:

DataFrame

Returns:

DataFrame with col as tz-naive datetime64[ns].

Raises:
  • KeyError – If col is not present in df.

  • ValueError – If the column cannot be parsed as datetime.

twiga.core.data.processing.augment_timeseries_signature(data, date_column, reduce_memory=False, engine='pandas')#

Add 29 datetime-based features to a pandas DataFrame or GroupBy object.

This function takes a DataFrame and a date column, extracting 29 time series features such as year, month, day, and more, adding them as new columns prefixed with the date_column name.

Parameters:
  • data (DataFrame | DataFrameGroupBy) – Input DataFrame or GroupBy object containing the time series data.

  • date_column (str) – Name of the column containing datetime values.

  • reduce_memory (bool) – If True, optimize DataFrame memory usage by converting to smaller dtypes. Defaults to False.

  • engine (str) – Engine to use for feature generation. Only ‘pandas’ is supported. Defaults to ‘pandas’.

Return type:

DataFrame

Returns:

DataFrame with 29 new datetime features added.

Raises:
  • TypeError – If data is not a pandas DataFrame or GroupBy object.

  • ValueError – If date_column is invalid or engine is not ‘pandas’.

Features:
  • _index_num: Datetime as seconds since epoch.

  • _year: Year of the datetime.

  • _year_iso: ISO year.

  • _yearstart: 1 if first day of year, 0 otherwise.

  • _yearend: 1 if last day of year, 0 otherwise.

  • _leapyear: 1 if leap year, 0 otherwise.

  • _half: Half year (1 for Jan-Jun, 2 for Jul-Dec).

  • _quarter: Quarter (1 to 4).

  • _quarteryear: Year and quarter (e.g., ‘2023Q1’).

  • _quarterstart: 1 if first day of quarter, 0 otherwise.

  • _quarterend: 1 if last day of quarter, 0 otherwise.

  • _month: Month number (1 to 12).

  • _month_lbl: Month name (e.g., ‘January’).

  • _monthstart: 1 if first day of month, 0 otherwise.

  • _monthend: 1 if last day of month, 0 otherwise.

  • _yweek: ISO week of the year.

  • _mweek: Week of the month.

  • _wday: Day of week (1=Monday, 7=Sunday).

  • _wday_lbl: Day of week name (e.g., ‘Monday’).

  • _mday: Day of month.

  • _qday: Day of quarter.

  • _yday: Day of year.

  • _weekend: 1 if weekend (Sat/Sun), 0 otherwise.

  • _hour: Hour of day.

  • _minute: Minute of hour.

  • _second: Second of minute.

  • _msecond: Microsecond.

  • _nsecond: Nanosecond.

  • _am_pm: ‘am’ or ‘pm’.

twiga.core.data.processing.augment_rolling(data, date_column, value_column, window_func='mean', window=2, min_periods=None, center=False, threads=1, show_progress=True, reduce_memory=False, **kwargs)#

Apply rolling window functions to a pandas DataFrame or GroupBy object.

This function sorts the data by the date column and applies one or more rolling window functions (e.g., mean, sum, or custom functions) to the specified value column(s) with given window sizes. Parallel processing is used for GroupBy objects when threads > 1, but may have overhead for small datasets.

Parameters:
  • data (DataFrame | DataFrameGroupBy) – Input DataFrame or GroupBy object to process.

  • date_column (str) – Name of the column containing dates, used for sorting.

  • value_column (str | list[str]) – Column name or list of column names to apply rolling functions to.

  • window_func (str | list[str | tuple[str, Callable[..., Any]]]) – Function(s) to apply. Can be a string (e.g., “mean”, “sum”), or a list of strings or tuples of (name, callable). Defaults to “mean”.

  • window (int | tuple[int, int] | list[int]) – Size(s) of the rolling window(s). Can be an integer, tuple (range of sizes), or list of integers. Defaults to 2.

  • min_periods (int | None) – Minimum observations in window to produce a value. Defaults to window size.

  • center (bool) – If True, center the rolling window. Defaults to False (trailing window).

  • threads (int) – Number of threads for parallel processing. Use -1 for all cores, 1 for serial. Defaults to 1.

  • show_progress (bool) – If True, display a progress bar during processing. Defaults to True.

  • reduce_memory (bool) – If True, optimize DataFrame memory usage. Defaults to False.

  • **kwargs – Additional arguments to pass to pandas rolling functions.

Return type:

DataFrame

Returns:

DataFrame with new columns for each function, window size, and value column, sorted by original index.

Raises:
  • TypeError – If data, window_func, or window is of an invalid type.

  • ValueError – If date_column, value_column, window, or threads are invalid.

Examples

>>> import pandas as pd
>>> df = pd.DataFrame(
...     {
...         "date": pd.to_datetime(["2023-01-01", "2023-01-02", "2023-01-03"]),
...         "value": [10, 20, 30],
...         "id": ["A", "A", "A"],
...     }
... )
>>> rolled_df = augment_rolling(
...     df,
...     date_column="date",
...     value_column="value",
...     window_func=["mean", ("range", lambda x: x.max() - x.min())],
...     window=[2, 3],
... )
>>> print(rolled_df)
date  value id value_rolling_mean_win_2 value_rolling_mean_win_3 value_rolling_range_win_2
0 2023-01-01     10  A                       NaN                       NaN                        NaN
1 2023-01-02     20  A                      15.0                       NaN                       10.0
2 2023-01-03     30  A                      25.0                      20.0                       10.0
>>> rolled_grouped = augment_rolling(
...     df.groupby("id"), date_column="date", value_column="value", window_func="sum", window=(1, 2)
... )
>>> print(rolled_grouped)
        date  value id  value_rolling_sum_win_1  value_rolling_sum_win_2
0 2023-01-01     10  A                     10.0                     10.0
1 2023-01-02     20  A                     20.0                     30.0
2 2023-01-03     30  A                     30.0                     50.0
twiga.core.data.processing.augment_lags(data, date_column, value_column, lags=1, reduce_memory=False)#

Add lagged columns to a pandas DataFrame or GroupBy object.

This function takes a DataFrame or GroupBy object, sorts it by the specified date column, and adds lagged versions of the specified value column(s) based on the provided lags.

Parameters:
  • data (DataFrame | DataFrameGroupBy) – Input DataFrame or GroupBy object to add lagged columns to.

  • date_column (str) – Name of the column containing dates, used for sorting.

  • value_column (str | list[str]) – Column name or list of column names to add lagged values for.

  • lags (int | tuple[int, int] | list[int]) – Number of lagged values to add. Can be an integer (single lag), tuple (range of lags), or list (specific lags). Defaults to 1.

  • reduce_memory (bool) – If True, optimize DataFrame memory usage by adjusting data types. Defaults to False.

Return type:

DataFrame

Returns:

DataFrame with lagged columns added, sorted by original index.

Raises:
  • TypeError – If data or lags is of an invalid type.

  • ValueError – If date_column or value_column is invalid.

Examples

>>> import pandas as pd
>>> df = pd.DataFrame(
...     {
...         "date": pd.to_datetime(["2023-01-01", "2023-01-02", "2023-01-03"]),
...         "value": [10, 20, 30],
...         "id": ["A", "A", "A"],
...     }
... )
>>> lagged_df = augment_lags(df, date_column="date", value_column="value", lags=[1, 2])
>>> print(lagged_df)
        date  value id  value_lag_1  value_lag_2
0 2023-01-01     10  A          NaN          NaN
1 2023-01-02     20  A         10.0          NaN
2 2023-01-03     30  A         20.0         10.0
twiga.core.data.processing.stack_features(past_features, future_covariates=None)#

Combines past features and future covariates into a single feature array.

Parameters:
  • past_features (ndarray) – A 2D or 3D NumPy array of past exogenous features. If 2D, it is reshaped to 3D. Expected shape is (num_timesteps, num_features) for 2D or (batch_size, num_timesteps, num_features) for 3D.

  • future_covariates (ndarray | None) – An optional 2D or 3D NumPy array of future covariates. If 2D, it is reshaped to 3D. Expected shape is (num_timesteps, num_features) for 2D or (batch_size, num_timesteps, num_features) for 3D. Defaults to None.

Return type:

ndarray

Returns:

A 3D NumPy array with combined features and covariates along the time axis. Shape is (batch_size, total_timesteps, max_features), where total_timesteps is the sum of past and future timesteps (padded if necessary), and max_features is the maximum of past and covariate features (padded with zeros if needed).

Raises:
  • ValueError – If past_features or future_covariates have invalid dimensions (not 2D or 3D).

  • ValueError – If batch sizes mismatch after reshaping.

Example

>>> import numpy as np
>>> past = np.ones((96, 2))  # 96 timesteps, 2 features
>>> future = np.ones((48, 10))  # 48 timesteps, 10 features
>>> result = combine_past_future_exogenous(past, future)
>>> print(result.shape)  # Output: (1, 144, 10)
>>> print(result[0, :2])  # First two timesteps of past
[[1. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 1. 0. 0. 0. 0. 0. 0. 0. 0.]]
>>> print(result[0, 96:98])  # First two timesteps of future
[[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
twiga.core.data.processing.unstack_features(combined_features, lookback_window_size, num_padding, forecast_horizon=None)#

Inverse operation to split combined features into past and future parts.

Parameters:
  • combined_features (ndarray) – 3D array of shape (batch_size, total_timesteps, max_features).

  • lookback_window_size (int) – Number of timesteps corresponding to past features.

  • num_padding (int) – If negative, drop last -num_padding features from past; if non-negative, drop last num_padding features from future.

  • forecast_horizon (int | None) – Number of timesteps corresponding to future features. If None, only past features are present.

Return type:

tuple[ndarray, ndarray | None]

Returns:

A tuple (past, future) where – - past is the past features (possibly trimmed). - future is the future features (trimmed as needed) or None.

twiga.core.data.processing.get_n_sample_per_day(period)#

Calculate the number of samples taken per day based on the given period in minutes or hours.

This function uses pandas.Timedelta to interpret the period string (e.g., ‘30min’, ‘1h’) and calculates the number of samples per day based on 1440 minutes/day. Only units ‘min’ or ‘h’ are allowed (deprecated aliases ‘T’ and ‘H’ are accepted and normalised).

Parameters:

period (str) – The period string in pandas offset format (e.g., ‘30min’, ‘1h’).

Return type:

int

Returns:

int – The number of samples taken per day.

Raises:
  • TypeError – If the period is not a string.

  • ValueError – If the period is invalid or uses units other than ‘min’, ‘h’.

Examples

>>> get_n_sample_per_day("30min")
48
>>> get_n_sample_per_day("15min")
96
>>> get_n_sample_per_day("1h")
24
>>> get_n_sample_per_day("2h")
12
twiga.core.data.processing.detect_missing_date(dataset, period=30)#

Fill missing dates in a time series dataset with NaN values.

Parameters#

  • dataset (pd.DataFrame): The input time series dataset with a datetime index.

  • period (int): The split_freq, in minutes, for the new date range. Default is 30 minutes.

returns:

pd.DataFrame – The input dataset with missing dates filled with NaN values.

Feature selection#

class twiga.core.data.relevance.AssociationAnalyzer(method='spearman', task='regression', **kwargs)

Bases: object

A unified interface for calculating and visualizing feature associations.

static compute(data, target_col, variable_cols, method='spearman', task='regression', **kwargs)

Calculate association between target and features using the chosen method.

Parameters:
  • data (DataFrame) – Input DataFrame.

  • target_col (str) – The dependent variable.

  • variable_cols (list[str]) – List of independent variables.

  • method (Literal['pearson', 'spearman', 'kendall', 'xicor', 'pps', 'mi', 'anova', 'chi2']) – The statistical method to use.

  • task (Literal['regression', 'classification']) – Task type (regression/classification) for PPS, MI, and ANOVA.

  • **kwargs (Any) – Method-specific args: - pps: sample, cross_validation, random_seed - xicor: ties - mi: random_state, n_neighbors - rank: N/A (standard pandas implementation)

Return type:

DataFrame

Returns:

DataFrame with columns [‘target’, ‘feature’, ‘score’] and optionally [‘p_value’].

static plot_heatmap(assoc_df, score_col='score', target_col_name='target', feature_col_name='feature')

Visualize association results using a heatmap.

Parameters:
  • assoc_df (DataFrame) – DataFrame returned by .compute().

  • score_col (str) – Column name to use for the heatmap values.

  • target_col_name (str) – Name of the column representing the Y axis (target).

  • feature_col_name (str) – Name of the column representing the X axis (feature).

twiga.core.data.selection.select_top_features(data, features, target, task='regression', top_k=5, alpha=0.05, rank_aggregation='borda_count', include_pps=True, include_xi=True, include_rf=True, include_chi2=True, random_state=42, return_scores=False)

Selects top-k features using a robust ensemble of statistical metrics.

Combines linear, non-linear, and model-based relevance scores. Note: Alpha filtering applies only to ANOVA and Chi-square p-values.

Return type:

list[str] | tuple[list[str], DataFrame]

twiga.core.data.selection.compute_rf_importance(data, target, features, task='regression', n_estimators=50, random_state=42, n_jobs=-1, pivot=True)

Compute Random Forest feature importance using SelectFromModel.

Uses mean impurity decrease (MDI) across trees, selected via a mean-importance threshold internally. MDI importance is a model-based measure - unlike statistical tests it captures non-linear interactions automatically.

Parameters:
  • data (DataFrame) – Input DataFrame.

  • target (str) – Target column.

  • features (str | list[str]) – Feature column(s).

  • task (str) – “regression” or “classification”.

  • n_estimators (int) – Number of trees in the forest. Defaults to 50.

  • random_state (int | None) – Random seed. Defaults to 42.

  • n_jobs (int) – Number of jobs for parallelisation. Defaults to -1 (all cores).

  • pivot (bool) – If True, pivot result with features as index. Defaults to True.

Return type:

DataFrame

Returns:

DataFrame with [“feature”, “rf_importance”] in long format, or pivoted with features as index.

Example

>>> df = pd.DataFrame({"x1": [1, 2, 3], "x2": [4, 5, 6], "y": [0, 1, 0]})
>>> compute_rf_importance(df, "y", ["x1", "x2"], task="classification")

Signal characterisation#

class twiga.core.data.characterisation.SignalCharacteriser(config=None)#

Bases: object

Orchestrates the three Stage 1 diagnostic dimensions for a target series.

SignalCharacteriser is a pure diagnostic tool, not a pipeline step. It does not inherit from BaseEstimator and does not transform data. It consumes a target series, runs the three diagnostic routines, and returns a CharacterisationResult whose to_pipeline_hints() method feeds directly into DataPipeline construction.

Parameters:

config (CharacterisationConfig | None) – Diagnostic configuration. Defaults to CharacterisationConfig with all defaults when None.

Example:

config = CharacterisationConfig(n_samples_per_day=48)
characteriser = SignalCharacteriser(config)
result = characteriser.analyse(df["net_load"], target_column="net_load")

hints = result.to_pipeline_hints()
# hints == {"lags": [48, 96, 336], "lookback_window_size": 336,
#           "integration_order": 0}

summary_df = SignalCharacteriser.summary(result)
analyse(series, target_column='target')#

Run all three Stage 1 diagnostic dimensions on series.

The series should be drawn from the training split only to prevent data leakage. It must be sorted in chronological order, regularly spaced, and free of NaN values.

Parameters:
  • series (Series | ndarray) – 1-D target series values.

  • target_column (str) – Name label attached to the result for traceability. Defaults to "target".

Return type:

CharacterisationResult

Returns:

CharacterisationResult containing the full Stage 1 diagnostic output.

Raises:

ValueError – If series contains fewer than _MIN_STATIONARITY_OBS observations or is not 1-D.

static interpreted_summary(result, config=None)#

Return a theory-grounded summary with an Interpretation column.

Extends summary() with a third column that explains what each metric value means, how it was derived, and what it implies for the modelling pipeline. All interpretations are derived from result and config alone - no external globals are required.

Parameters:
Return type:

DataFrame

Returns:

pd.DataFrame with columns ["Dimension", "Value", "Interpretation"].

Example:

result = SignalCharacteriser().analyse(df["net_load"])
df = SignalCharacteriser.interpreted_summary(result)
static summary(result)#

Return a human-readable tabular summary of a characterisation result.

Produces a two-column DataFrame (Dimension, Value) suitable for display in a notebook or for logging as a text artefact.

Parameters:

result (CharacterisationResult) – A CharacterisationResult produced by analyse().

Return type:

DataFrame

Returns:

pd.DataFrame with columns ["Dimension", "Value"].

class twiga.core.data.characterisation.CharacterisationResult(**data)#

Bases: BaseModel

Complete Stage 1 characterisation of a target series.

Aggregates the outputs of the four diagnostic dimensions and exposes to_pipeline_hints() as the closure mechanism to Stage 4.

Parameters:
  • target_column (str) – Name of the target column that was characterised.

  • n_observations (int) – Total number of observations in the input series.

  • stationarity (StationarityResult) – Joint ADF/KPSS stationarity result.

  • complexity (list[ComplexityProfile]) – Complexity profiles for the full series, the stable demand regime, and the ramp-event regime (always in this order).

  • temporal (TemporalStructureResult) – ACF/PACF-based lag order and seasonal period estimates.

  • predictability (PredictabilityResult) – AMI-based forecastability classification across forecast horizons.

complexity: list[ComplexityProfile]#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

n_observations: int#
predictability: PredictabilityResult#
stationarity: StationarityResult#
target_column: str#
temporal: TemporalStructureResult#
to_pipeline_hints()#

Return DataPipeline constructor kwargs derived from Stage 1.

The returned dict can be unpacked directly into DataPipeline to close the Stage 1 to Stage 4 parametrisation loop:

hints = result.to_pipeline_hints()
pipe = DataPipeline(
    target_feature=["net_load"],
    period="30min",
    forecast_horizon=48,
    **hints,
)
Return type:

dict[str, Any]

Returns:

Dict with keys

  • "lags" – list of suggested lag values for the lags parameter of DataPipeline. Empty list when no seasonal periods are detected.

  • "lookback_window_size"max(lags) when lags are available, otherwise n_samples_per_day * 7 as a safe weekly fallback.

  • "integration_order" – integration order d from the stationarity result, for downstream use with TimeSeriesDifferentiator.

  • "max_forecast_horizon"PredictabilityResult.effective_horizon, the first horizon at which the AMI profile decays to noise level. None when no meaningful AMI signal is detected or the profile does not decay within the computed range.

class twiga.core.data.characterisation.StationarityResult(**data)#

Bases: BaseModel

Joint ADF/KPSS stationarity diagnosis.

Parameters:
  • adf_statistic (float) – ADF test statistic (more negative implies stronger evidence of stationarity).

  • adf_pvalue (float) – ADF p-value. Values below alpha reject the unit-root null hypothesis (i.e. the series is stationary).

  • kpss_statistic (float) – KPSS test statistic.

  • kpss_pvalue (float) – KPSS p-value. Values below alpha reject the stationarity null hypothesis (i.e. the series is non-stationary).

  • integration_order (int) – Recommended differencing order d. 0 means operate on levels; 1 means first-difference before modelling.

  • verdict (Literal['stationary', 'non-stationary', 'near-integrated', 'fractional']) – Human-readable joint interpretation of the two tests.

  • recommendation (str) – One-sentence operational guidance derived from verdict.

adf_pvalue: float#
adf_statistic: float#
integration_order: int#
kpss_pvalue: float#
kpss_statistic: float#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

recommendation: str#
verdict: Literal['stationary', 'non-stationary', 'near-integrated', 'fractional']#
class twiga.core.data.characterisation.ComplexityProfile(**data)#

Bases: BaseModel

Entropy and long-memory profile for a single operating regime.

Parameters:
  • regime (Literal['full', 'stable', 'ramp']) – One of "full", "stable", or "ramp".

  • n_observations (int) – Number of timesteps in the regime window used for the computation.

  • sample_entropy (float) – Sample entropy of the regime series. Lower values indicate more regular, predictable behaviour.

  • permutation_entropy (float) – Normalised permutation entropy.

  • hurst_exponent (float) – Hurst exponent (H). H > 0.5 indicates persistent, trend-following behaviour; H < 0.5 indicates mean-reversion; H 0.5 is consistent with a random walk.

hurst_exponent: float#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

n_observations: int#
permutation_entropy: float#
regime: Literal['full', 'stable', 'ramp']#
sample_entropy: float#
class twiga.core.data.characterisation.TemporalStructureResult(**data)#

Bases: BaseModel

Lag order and seasonal period estimates from ACF/PACF analysis.

Parameters:
  • ar_order (int) – Suggested AR order from the PACF cutoff (Box-Jenkins methodology).

  • significant_lags (list[int]) – All lags where the PACF is statistically significant outside the alpha-level confidence band.

  • seasonal_periods (list[int]) – Dominant seasonal periods detected as local maxima in the ACF above the confidence band, measured in number of timesteps.

  • dominant_period (int | None) – The lag with the single largest ACF value among seasonal_periods. None if no seasonal periods are detected.

  • suggested_lags (list[int]) – Calendar-aligned lag multiples derived from seasonal_periods and scaled by n_samples_per_day. These are ready for direct use as the lags argument of DataPipeline.

ar_order: int#
dominant_period: int | None#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

seasonal_periods: list[int]#
significant_lags: list[int]#
suggested_lags: list[int]#
class twiga.core.data.characterisation.PredictabilityResult(**data)#

Bases: BaseModel

AMI-based forecastability classification from the horizon-specific profile.

Quantifies how much information the past contains about each future step and how far that information persists across horizons. Two complementary measures drive the classification:

  • ami_h1 – the level anchor: AMI at the shortest horizon (h = 1). A weak h = 1 signal implies low predictability regardless of the decay shape.

  • rel_auc – the persistence measure: mean AMI across all horizons divided by ami_h1. A high rel_auc means the signal decays slowly; a low rel_auc means it collapses immediately after h = 1.

Classification rule (applied in order):

  1. ami_h1 < ami_h1_low"low" (signal is too weak at h = 1).

  2. rel_auc < rel_auc_low"low" (signal collapses immediately).

  3. rel_auc >= rel_auc_high"high" (signal is sustained).

  4. Otherwise → "moderate".

Parameters:
  • ami_h1 (float) – AMI at horizon h = 1 in nats. Acts as the level anchor for the classification. Zero when no meaningful signal is detected.

  • auc (float) – Mean AMI across all computed horizons (nats). Equivalent to the area under the AMI profile divided by the number of horizons.

  • rel_auc (float) – auc / ami_h1; zero when ami_h1 is zero. Captures signal persistence independently of signal strength.

  • peak_ami (float) – Maximum AMI value (nats) across all horizons.

  • peak_horizon (int) – Horizon at which the AMI profile is maximised.

  • effective_horizon (int | None) – First horizon h at which the AMI profile drops below ami_noise_floor * ami_h1. None when ami_h1 is zero or the profile never decays to the noise floor within the computed range. Used by CharacterisationResult.to_pipeline_hints() to bound the useful forecast window.

  • n_horizons (int) – Number of horizons included in the AMI profile.

  • label (Literal['low', 'moderate', 'high']) – Forecastability class - "low", "moderate", or "high".

ami_h1: float#
auc: float#
effective_horizon: int | None#
label: Literal['low', 'moderate', 'high']#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

n_horizons: int#
peak_ami: float#
peak_horizon: int#
rel_auc: float#

Next: Configuration System | TwigaForecaster | Neural Network Models