Skip to content

This part of the project documentation focuses on an information-oriented approach. Use it as a reference for the technical implementation of the mlpForecaster project code.

MLPForecast

MLPForecast(
    hparams: dict,
    exp_name: str = "Tanesco",
    file_name: str = None,
    seed: int = 42,
    root_dir: str = "../",
    trial=None,
    metric: str = "val_mae",
    max_epochs: int = 10,
    wandb: bool = False,
    model_type: str = "MLPF",
    gradient_clip_val: float = 10.0,
    rich_progress_bar: bool = True,
)

Bases: PytorchForecast

MLP Forecasting class for managing training, evaluation, and prediction.

Attributes:

  • hparams (dict) –

    Hyperparameters for the MLP model.

  • model (MLPForecastModel) –

    PyTorch model.

  • train_df (DataFrame) –

    Training DataFrame.

  • validation_df (DataFrame) –

    Validation DataFrame.

Parameters:

  • hparams (dict) –

    Hyperparameters for the MLP model.

  • exp_name (str, default: 'Tanesco' ) –

    Experiment name. Defaults to "Tanesco".

  • file_name (str, default: None ) –

    Name of the file for logging and saving checkpoints. Defaults to None.

  • seed (int, default: 42 ) –

    Random seed for reproducibility. Defaults to 42.

  • root_dir (str, default: '../' ) –

    Root directory for the project. Defaults to "../".

  • trial (trial, default: None ) –

    Optuna trial object for hyperparameter optimization. Defaults to None.

  • metric (str, default: 'val_mae' ) –

    Metric to monitor during training. Defaults to "val_mae".

  • max_epochs (int, default: 10 ) –

    Maximum number of epochs for training. Defaults to 10.

  • wandb (bool, default: False ) –

    Whether to use Weights and Biases for logging. Defaults to False.

  • model_type (str, default: 'MLPF' ) –

    Type of the model. Defaults to "MLPF".

  • gradient_clip_val (float, default: 10.0 ) –

    Value for gradient clipping. Defaults to 10.0.

  • rich_progress_bar (bool, default: True ) –

    Whether to use rich progress bar. Defaults to True.

Source code in mlpforecast/forecaster/mlp.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    hparams: dict,
    exp_name: str = "Tanesco",
    file_name: str = None,
    seed: int = 42,
    root_dir: str = "../",
    trial=None,
    metric: str = "val_mae",
    max_epochs: int = 10,
    wandb: bool = False,
    model_type: str = "MLPF",
    gradient_clip_val: float = 10.0,
    rich_progress_bar: bool = True,
):
    """
    MLP Forecasting class for managing training, evaluation, and prediction.

    Args:
        hparams (dict): Hyperparameters for the MLP model.
        exp_name (str, optional): Experiment name. Defaults to "Tanesco".
        file_name (str, optional): Name of the file for logging and saving checkpoints. Defaults to None.
        seed (int, optional): Random seed for reproducibility. Defaults to 42.
        root_dir (str, optional): Root directory for the project. Defaults to "../".
        trial (optuna.trial, optional): Optuna trial object for hyperparameter optimization. Defaults to None.
        metric (str, optional): Metric to monitor during training. Defaults to "val_mae".
        max_epochs (int, optional): Maximum number of epochs for training. Defaults to 10.
        wandb (bool, optional): Whether to use Weights and Biases for logging. Defaults to False.
        model_type (str, optional): Type of the model. Defaults to "MLPF".
        gradient_clip_val (float, optional): Value for gradient clipping. Defaults to 10.0.
        rich_progress_bar (bool, optional): Whether to use rich progress bar. Defaults to True.
    """
    super().__init__(
        file_name=file_name,
        seed=seed,
        root_dir=root_dir,
        trial=trial,
        metric=metric,
        max_epochs=max_epochs,
        wandb=wandb,
        model_type=model_type,
        gradient_clip_val=gradient_clip_val,
        rich_progress_bar=rich_progress_bar,
    )
    self.hparams = hparams
    self.model = MLPForecastModel(**hparams)

auto_tune

auto_tune(
    train_df,
    val_df,
    num_trial=10,
    reduction_factor=3,
    patience=2,
)

Perform hyperparameter tuning using Optuna.

Parameters:

  • train_df (DataFrame) –

    Training DataFrame.

  • val_df (DataFrame) –

    Validation DataFrame.

  • num_trial (int, default: 10 ) –

    Number of trials for hyperparameter optimization. Defaults to 10.

  • reduction_factor (int, default: 3 ) –

    Reduction factor for Hyperband pruner. Defaults to 3.

  • patience (int, default: 2 ) –

    Patience for the Patient pruner. Defaults to

Source code in mlpforecast/forecaster/mlp.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def auto_tune(self, train_df, val_df, num_trial=10, reduction_factor=3, patience=2):
    """
    Perform hyperparameter tuning using Optuna.

    Args:
        train_df (pd.DataFrame): Training DataFrame.
        val_df (pd.DataFrame): Validation DataFrame.
        num_trial (int, optional): Number of trials for hyperparameter optimization. Defaults to 10.
        reduction_factor (int, optional): Reduction factor for Hyperband pruner. Defaults to 3.
        patience (int, optional): Patience for the Patient pruner. Defaults to
    """
    self.train_df = train_df
    self.validation_df = val_df

    def print_callback(study, trial):
        logging.info(
            f"""Trial No: {trial.number}, Current value: {trial.value}, Current params: {trial.params}"""
        )
        logging.info(
            f"""Best value: {study.best_value}, Best params: {study.best_trial.params}"""
        )

    def objective(trial):
        params = self.get_search_params(trial)

        self.hparams.update(params)
        model = MLPForecast(
            self.hparams,
            exp_name=f"{self.exp_name}",
            seed=42,
            trial=trial,
            rich_progress_bar=True,
            file_name=trial.number,
        )

        val_cost = model.fit(self.train_df, self.validation_df)
        return val_cost

    study_name = f"{self.exp_name}_{self.model_type}"
    storage_name = f"sqlite:///{study_name}.db"
    base_pruner = optuna.pruners.HyperbandPruner(
        min_resource=1, max_resource="auto", reduction_factor=reduction_factor
    )
    pruner = optuna.pruners.PatientPruner(
        base_pruner, patience=patience, min_delta=0.0
    )
    study = optuna.create_study(
        direction="minimize",
        pruner=pruner,
        study_name=self.exp_name,
        storage=storage_name,
        load_if_exists=True,
    )
    study.optimize(
        objective,
        n_trials=num_trial,  # Default to 100 trials if not specified
        callbacks=[print_callback],
    )
    self.hparams.update(study.best_trial.params)
    np.save(f"{self.results_path}/best_params.npy", study.best_trial.params)

create_results_df

create_results_df(
    time_stamp,
    ground_truth,
    predictions,
    target_series,
    date_column,
)

Creates a DataFrame with the timestamp index and populates it with ground truth and forecasted values.

Parameters:

  • time_stamp (array) –

    The timestamp index.

  • ground_truth (array) –

    The ground truth values.

  • predictions (array) –

    The forecasted values.

  • target_series (list) –

    A list of target series names.

  • date_column (str) –

    The name of the date column.

Returns:

  • results_df ( DataFrame ) –

    A DataFrame containing the ground truth and forecasted values, indexed by timestamp.

Source code in mlpforecast/forecaster/common.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def create_results_df(
    self, time_stamp, ground_truth, predictions, target_series, date_column
):
    """
    Creates a DataFrame with the timestamp index and populates it with ground truth and forecasted values.

    Args:
        time_stamp (np.array): The timestamp index.
        ground_truth (np.array): The ground truth values.
        predictions (np.array): The forecasted values.
        target_series (list): A list of target series names.
        date_column (str): The name of the date column.

    Returns:
        results_df (pd.DataFrame): A DataFrame containing the ground truth and forecasted values, indexed by timestamp.

    """
    results_df = pd.DataFrame(index=pd.to_datetime(time_stamp.flatten(), unit="ns"))
    results_df.index.name = date_column

    for target_idx, target_name in enumerate(target_series):
        results_df[target_name] = ground_truth[:, :, target_idx].flatten()
        results_df[f"{target_name}_forecast"] = predictions[
            :, :, target_idx
        ].flatten()

    return results_df

evaluate_point_forecast

evaluate_point_forecast(ground_truth, pred, time_stamp)

Evaluates the point forecast.

Parameters:

  • ground_truth (array) –

    The ground truth values.

  • pred (array) –

    The forecasted values.

  • time_stamp (array) –

    The timestamp index.

Returns:

  • dict

    A dictionary containing the evaluation metrics.

Source code in mlpforecast/forecaster/common.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def evaluate_point_forecast(self, ground_truth, pred, time_stamp):
    """
    Evaluates the point forecast.

    Args:
        ground_truth (np.array): The ground truth values.
        pred (np.array): The forecasted values.
        time_stamp (np.array): The timestamp index.

    Returns:
        (dict): A dictionary containing the evaluation metrics.

    """
    return evaluate_point_forecast({
        "true": ground_truth,
        "loc": pred,
        "index": time_stamp,
        "targets": self.model.data_pipeline.target_series,
    })

fit

fit(
    train_df,
    val_df=None,
    train_ratio=0.8,
    drop_last=True,
    num_worker=1,
    batch_size=64,
    pin_memory=True,
)

Fit the model using the provided training DataFrame.

Parameters:

  • train_df (DataFrame) –

    The training data.

  • val_df (DataFrame, default: None ) –

    The validation data. If None, will split train_df based on train_ratio.

  • train_ratio (float, default: 0.8 ) –

    Proportion of data to use for training. Default is 0.80.

  • drop_last (bool, default: True ) –

    Whether to drop the last incomplete batch. Default is True.

  • num_worker (int, default: 1 ) –

    Number of workers for data loading. Default is 1.

  • batch_size (int, default: 64 ) –

    Size of each batch for training. Default is 64.

  • pin_memory (bool, default: True ) –

    If True, the data loader will copy Tensors into CUDA pinned memory. Default is True.

Returns:

  • float

    The training wall time or cost metric based on the training configuration.

Raises:

  • ValueError

    If the model instance is not initialized.

Source code in mlpforecast/forecaster/common.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def fit(
    self,
    train_df,
    val_df=None,
    train_ratio=0.80,
    drop_last=True,
    num_worker=1,
    batch_size=64,
    pin_memory=True,
):
    """
    Fit the model using the provided training DataFrame.

    Args:
        train_df (pd.DataFrame): The training data.
        val_df (pd.DataFrame, optional): The validation data. If None, will split train_df based on train_ratio.
        train_ratio (float, optional): Proportion of data to use for training. Default is 0.80.
        drop_last (bool, optional): Whether to drop the last incomplete batch. Default is True.
        num_worker (int, optional): Number of workers for data loading. Default is 1.
        batch_size (int, optional): Size of each batch for training. Default is 64.
        pin_memory (bool, optional): \
            If True, the data loader will copy Tensors into CUDA pinned memory. Default is True.

    Returns:
        (float): The training wall time or cost metric based on the training configuration.

    Raises:
        ValueError: If the model instance is not initialized.
    """

    # Check if the model instance is initialized
    if self.model is None:
        raise ValueError("Model instance is empty")

    # Set the checkpoint path for the model
    self.model.checkpoint_path = self.checkpoints

    # Fit the data pipeline using the training DataFrame
    self.model.data_pipeline.fit(train_df.copy())

    # If no validation DataFrame is provided, split the training DataFrame

    if val_df is None and train_ratio > 0.0:
        self.train_df = train_df.copy()
        train_df, val_df = (
            train_df.iloc[: int(train_ratio * len(train_df))],
            train_df.iloc[int(train_ratio * len(train_df)) :],
        )
        self.metric = f"val_{self.model.hparams['metric']}"
        val_feature, val_target = self.model.data_pipeline.transform(val_df)

    else:
        val_feature, val_target = None, None
        self.metric = f"train_{self.model.hparams['metric']}"
        self.train_df = pd.concat([train_df.copy(), val_df.copy()], axis=0)
    # Transform the training data into features and targets
    train_feature, train_target = self.model.data_pipeline.transform(train_df)

    # Initialize the TimeseriesDataModule
    self.datamodule = TimeseriesDataModule(
        train_inputs=train_feature,
        train_targets=train_target,
        val_inputs=val_feature,
        val_targets=val_target,
        drop_last=drop_last,
        num_worker=num_worker,
        batch_size=batch_size,
        pin_memory=pin_memory,
    )

    # Sort the training DataFrame by the specified date column
    self.train_df = self.train_df.sort_values(
        by=self.model.data_pipeline.date_column
    )

    # Select the last 'max_data_drop' rows plus 'input_window_size' rows
    data_drop = self.model.data_pipeline.max_data_drop
    input_window = self.model.data_pipeline.input_window_size

    self.train_df = self.train_df.iloc[-(data_drop + input_window) :]

    # Set up the trainer
    self._set_up_trainer()
    start_time = default_timer()
    logger.info("""---------------Training started ---------------------------""")

    # Train the model and log training duration

    self.trainer.fit(
        self.model,
        self.datamodule.train_dataloader(),
        self.datamodule.val_dataloader(),
        ckpt_path=get_latest_checkpoint(self.checkpoints),
    )

    self.train_walltime = default_timer() - start_time
    logging.info(f"Training complete after {self.train_walltime / 60:.2f} minutes")

    if self.trial is not None:
        # Make predictions and compute the cost metric for hyper-param optimisation
        self.predict(val_df)
        cost = self.metrics.groupby("target")["MAE"].mean().iloc[-1].round(2)

        return cost  # Return the computed cost metric
    else:
        return self.train_walltime

get_search_params

get_search_params(trial: Trial) -> dict

Define the search space for hyperparameter optimization using Optuna.

Parameters:

  • trial (Trial) –

    An Optuna trial object to suggest parameters.

Returns:

  • dict ( dict ) –

    A dictionary containing suggested hyperparameters.

Source code in mlpforecast/forecaster/mlp.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def get_search_params(self, trial: Trial) -> dict:
    """
    Define the search space for hyperparameter optimization using Optuna.

    Args:
        trial: An Optuna trial object to suggest parameters.

    Returns:
        dict: A dictionary containing suggested hyperparameters.
    """
    params = {}

    # Define integer hyperparameters
    params["embedding_size"] = trial.suggest_int("embedding_size", 8, 64, step=2)
    params["hidden_size"] = trial.suggest_int("hidden_size", 8, 512, step=2)
    params["num_layers"] = trial.suggest_int("num_layers", 1, 5)
    params["expansion_factor"] = trial.suggest_int("expansion_factor", 1, 4)

    # Define categorical hyperparameters
    params["embedding_type"] = trial.suggest_categorical(
        "embedding_type", [None, "PosEmb", "RotaryEmb", "CombinedEmb"]
    )
    params["combination_type"] = trial.suggest_categorical(
        "combination_type", ["addition-comb", "weighted-comb"]
    )
    params["residual"] = trial.suggest_categorical("residual", [True, False])
    params["activation_function"] = trial.suggest_categorical(
        "activation_function", ACTIVATIONS
    )
    params["out_activation_function"] = trial.suggest_categorical(
        "out_activation_function", ACTIVATIONS
    )

    # Define float hyperparameters
    params["dropout_rate"] = trial.suggest_float(
        "dropout_rate", 0.0, 0.9, step=0.05
    )
    params["alpha"] = trial.suggest_float("alpha", 0.0, 1, step=0.05)

    return params

load_and_prepare_data

load_and_prepare_data(
    test_df: DataFrame, daily_feature: str
)

Loads the checkpoint and prepares the ground truth data.

Parameters:

  • test_df (DataFrame) –

    The test DataFrame containing the input features for prediction.

  • daily_feature (str) –

    The daily feature to use in the model.

Returns:

  • ground_truth ( DataFrame ) –

    A DataFrame containing the ground truth data.

Source code in mlpforecast/forecaster/common.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def load_and_prepare_data(self, test_df: pd.DataFrame, daily_feature: str):
    """
    Loads the checkpoint and prepares the ground truth data.

    Args:
        test_df (pd.DataFrame): The test DataFrame containing the input features for prediction.
        daily_feature (str): The daily feature to use in the model.

    Returns:
        ground_truth (pd.DataFrame): A DataFrame containing the ground truth data.
    """
    self.load_checkpoint()
    self.model.data_pipeline.daily_features = daily_feature

    # Prepare ground truth data
    ground_truth = test_df.iloc[self.model.data_pipeline.max_data_drop :].copy()
    ground_truth[self.model.data_pipeline.date_column] = pd.to_numeric(
        ground_truth[self.model.data_pipeline.date_column]
    )
    return ground_truth

load_checkpoint

load_checkpoint()

Load the latest checkpoint for the model.

This method retrieves the path of the latest checkpoint and loads the model from it.

Source code in mlpforecast/forecaster/mlp.py
76
77
78
79
80
81
82
83
84
def load_checkpoint(self):
    """
    Load the latest checkpoint for the model.

    This method retrieves the path of the latest checkpoint and loads the model from it.
    """
    path_best_model = get_latest_checkpoint(self.checkpoints)
    self.model = MLPForecastModel.load_from_checkpoint(path_best_model)
    self.model.eval()

perform_prediction

perform_prediction(test_df: DataFrame)

Performs the model prediction.

Parameters:

  • test_df (DataFrame) –

    The test DataFrame containing the input features for prediction.

Returns:

  • dict

    A dictionary containing the forecasted values.

Source code in mlpforecast/forecaster/common.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def perform_prediction(self, test_df: pd.DataFrame):
    """
    Performs the model prediction.

    Args:
        test_df (pd.DataFrame): The test DataFrame containing the input features for prediction.

    Returns:
        (dict): A dictionary containing the forecasted values.

    """
    features, _ = self.model.data_pipeline.transform(test_df.copy())
    features = torch.FloatTensor(features.copy())
    self.model.to(features.device)
    self.model.eval()
    return self.model.forecast(features)

predict

predict(
    test_df=None, covariate_df=None, daily_feature=True
)

Perform prediction on the test DataFrame and return a DataFrame with ground truth and forecasted values.

Parameters:

  • test_df (DataFrame, default: None ) –

    The test DataFrame containing the input features for prediction.

  • daily_feature (bool, default: True ) –

    Flag indicating whether daily features are used in the model. Default is True.

Returns:

  • results_df ( DataFrame ) –

    A DataFrame containing the ground truth and forecasted values, indexed by timestamp.

Source code in mlpforecast/forecaster/common.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
def predict(self, test_df=None, covariate_df=None, daily_feature=True):
    """
    Perform prediction on the test DataFrame and return a DataFrame with ground truth and forecasted values.

    Args:
        test_df (pd.DataFrame): The test DataFrame containing the input features for prediction.
        daily_feature (bool): Flag indicating whether daily features are used in the model. Default is True.

    Returns:
        results_df (pd.DataFrame): A DataFrame containing the ground truth and forecasted values, indexed by timestamp.
    """
    if (test_df is not None) and (self.train_df is not None):
        test_df = pd.concat([self.train_df, test_df], axis=0)
    else:
        assert ValueError("test_df can not be None")
    test_df = test_df.sort_values(by=self.model.data_pipeline.date_column)

    ground_truth = self.load_and_prepare_data(test_df, daily_feature)
    time_stamp = ground_truth[[self.model.data_pipeline.date_column]].values
    ground_truth = ground_truth[self.model.data_pipeline.target_series].values

    time_stamp = format_target(
        time_stamp,
        self.model.hparams["input_window_size"],
        self.model.hparams["forecast_horizon"],
        daily_feature=self.model.data_pipeline.daily_features,
    )
    ground_truth = format_target(
        ground_truth,
        self.model.hparams["input_window_size"],
        self.model.hparams["forecast_horizon"],
        daily_feature=self.model.data_pipeline.daily_features,
    )

    start_time = default_timer()
    pred = self.perform_prediction(test_df)
    self.test_walltime = default_timer() - start_time

    # Inverse transform predictions
    scaler = self.model.data_pipeline.data_pipeline.named_steps["scaling"]
    target_scaler = scaler.named_transformers_["target_scaler"]

    N, T, C = pred["pred"].size()
    pred["pred"] = target_scaler.inverse_transform(
        pred["pred"].numpy().reshape(N * T, C)
    )
    pred["pred"] = pred["pred"].reshape(N, T, C)

    # Evaluate point forecast
    self.metrics = self.evaluate_point_forecast(
        ground_truth, pred["pred"], time_stamp
    )
    self.metrics["test-time"] = self.test_walltime
    self.metrics["Model"] = self.model_type.upper()

    # Assert that the prediction and ground truth shapes are the same
    assert (
        pred["pred"].shape == ground_truth.shape
    ), "Shape mismatch: pred['pred'] and ground_truth must have the same shape."

    # Create results DataFrame
    results_df = self.create_results_df(
        time_stamp,
        ground_truth,
        pred["pred"],
        self.model.data_pipeline.target_series,
        self.model.data_pipeline.date_column,
    )
    results_df["Model"] = self.model_type.upper()

    return results_df

PytorchForecast

PytorchForecast(
    exp_name="Tanesco",
    file_name=None,
    seed=42,
    root_dir="../",
    trial=None,
    metric="val_mae",
    max_epochs=10,
    wandb=False,
    model_type="MLPF",
    rich_progress_bar=False,
    gradient_clip_val=10,
)

PytorchForecast class for setting up and managing the training process of a PyTorch model using PyTorch Lightning.

Attributes:

  • exp_name (str) –

    The name of the experiment.

  • file_name (str) –

    The name of the file to save the logs and model checkpoints.

  • seed (int) –

    The seed for random number generation to ensure reproducibility.

  • root_dir (str) –

    The root directory for saving logs and checkpoints.

  • trial (Trial) –

    Optuna trial for hyperparameter optimization.

  • metric (str) –

    The metric to monitor for early stopping and model checkpointing.

  • max_epochs (int) –

    The maximum number of training epochs.

  • wandb (bool) –

    Flag to use Wandb logger instead of TensorBoard logger.

  • rich_progress_bar (bool) –

    Flag to use rich progress bar for training visualization.

Parameters:

  • exp_name (str, default: 'Tanesco' ) –

    The name of the experiment. Defaults to "Tanesco".

  • file_name (str, default: None ) –

    The name of the file to save the logs and model checkpoints. Defaults to None.

  • seed (int, default: 42 ) –

    The seed for random number generation. Defaults to 42.

  • root_dir (str, default: '../' ) –

    The root directory for saving logs and checkpoints. Defaults to "../".

  • trial (Trial, default: None ) –

    Optuna trial for hyperparameter optimization. Defaults to None.

  • metric (str, default: 'val_mae' ) –

    The metric to monitor for early stopping and model checkpointing. Defaults to "val_mae".

  • max_epochs (int, default: 10 ) –

    The maximum number of training epochs. Defaults to 10.

  • wandb (bool, default: False ) –

    Flag to use Wandb logger instead of TensorBoard logger. Defaults to False.

  • rich_progress_bar (bool, default: False ) –

    Flag to use rich progress bar for training visualization. Defaults to True.

Source code in mlpforecast/forecaster/common.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    exp_name="Tanesco",
    file_name=None,
    seed=42,
    root_dir="../",
    trial=None,
    metric="val_mae",
    max_epochs=10,
    wandb=False,
    model_type="MLPF",
    rich_progress_bar=False,
    gradient_clip_val=10,
):
    """
    Initializes the PytorchForecast class with the given parameters.

    Args:
        exp_name (str): The name of the experiment. Defaults to "Tanesco".
        file_name (str): The name of the file to save the logs and model checkpoints. Defaults to None.
        seed (int): The seed for random number generation. Defaults to 42.
        root_dir (str): The root directory for saving logs and checkpoints. Defaults to "../".
        trial (optuna.trial.Trial): Optuna trial for hyperparameter optimization. Defaults to None.
        metric (str): The metric to monitor for early stopping and model checkpointing. Defaults to "val_mae".
        max_epochs (int): The maximum number of training epochs. Defaults to 10.
        wandb (bool): Flag to use Wandb logger instead of TensorBoard logger. Defaults to False.
        rich_progress_bar (bool): Flag to use rich progress bar for training visualization. Defaults to True.
    """
    super().__init__()

    self.exp_name = exp_name
    self.file_name = file_name
    self.seed = seed
    self.root_dir = root_dir
    self.trial = trial
    self.metric = metric
    self.max_epochs = max_epochs
    self.wandb = wandb
    self.model_type = model_type
    self.rich_progress_bar = rich_progress_bar
    self.model_type = "default_model"  # Update with actual model type
    self.model = None
    self.train_df=None
    self.datamodule = None
    self.gradient_clip_val = gradient_clip_val
    self._create_folder()

create_results_df

create_results_df(
    time_stamp,
    ground_truth,
    predictions,
    target_series,
    date_column,
)

Creates a DataFrame with the timestamp index and populates it with ground truth and forecasted values.

Parameters:

  • time_stamp (array) –

    The timestamp index.

  • ground_truth (array) –

    The ground truth values.

  • predictions (array) –

    The forecasted values.

  • target_series (list) –

    A list of target series names.

  • date_column (str) –

    The name of the date column.

Returns:

  • results_df ( DataFrame ) –

    A DataFrame containing the ground truth and forecasted values, indexed by timestamp.

Source code in mlpforecast/forecaster/common.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def create_results_df(
    self, time_stamp, ground_truth, predictions, target_series, date_column
):
    """
    Creates a DataFrame with the timestamp index and populates it with ground truth and forecasted values.

    Args:
        time_stamp (np.array): The timestamp index.
        ground_truth (np.array): The ground truth values.
        predictions (np.array): The forecasted values.
        target_series (list): A list of target series names.
        date_column (str): The name of the date column.

    Returns:
        results_df (pd.DataFrame): A DataFrame containing the ground truth and forecasted values, indexed by timestamp.

    """
    results_df = pd.DataFrame(index=pd.to_datetime(time_stamp.flatten(), unit="ns"))
    results_df.index.name = date_column

    for target_idx, target_name in enumerate(target_series):
        results_df[target_name] = ground_truth[:, :, target_idx].flatten()
        results_df[f"{target_name}_forecast"] = predictions[
            :, :, target_idx
        ].flatten()

    return results_df

evaluate_point_forecast

evaluate_point_forecast(ground_truth, pred, time_stamp)

Evaluates the point forecast.

Parameters:

  • ground_truth (array) –

    The ground truth values.

  • pred (array) –

    The forecasted values.

  • time_stamp (array) –

    The timestamp index.

Returns:

  • dict

    A dictionary containing the evaluation metrics.

Source code in mlpforecast/forecaster/common.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def evaluate_point_forecast(self, ground_truth, pred, time_stamp):
    """
    Evaluates the point forecast.

    Args:
        ground_truth (np.array): The ground truth values.
        pred (np.array): The forecasted values.
        time_stamp (np.array): The timestamp index.

    Returns:
        (dict): A dictionary containing the evaluation metrics.

    """
    return evaluate_point_forecast({
        "true": ground_truth,
        "loc": pred,
        "index": time_stamp,
        "targets": self.model.data_pipeline.target_series,
    })

fit

fit(
    train_df,
    val_df=None,
    train_ratio=0.8,
    drop_last=True,
    num_worker=1,
    batch_size=64,
    pin_memory=True,
)

Fit the model using the provided training DataFrame.

Parameters:

  • train_df (DataFrame) –

    The training data.

  • val_df (DataFrame, default: None ) –

    The validation data. If None, will split train_df based on train_ratio.

  • train_ratio (float, default: 0.8 ) –

    Proportion of data to use for training. Default is 0.80.

  • drop_last (bool, default: True ) –

    Whether to drop the last incomplete batch. Default is True.

  • num_worker (int, default: 1 ) –

    Number of workers for data loading. Default is 1.

  • batch_size (int, default: 64 ) –

    Size of each batch for training. Default is 64.

  • pin_memory (bool, default: True ) –

    If True, the data loader will copy Tensors into CUDA pinned memory. Default is True.

Returns:

  • float

    The training wall time or cost metric based on the training configuration.

Raises:

  • ValueError

    If the model instance is not initialized.

Source code in mlpforecast/forecaster/common.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def fit(
    self,
    train_df,
    val_df=None,
    train_ratio=0.80,
    drop_last=True,
    num_worker=1,
    batch_size=64,
    pin_memory=True,
):
    """
    Fit the model using the provided training DataFrame.

    Args:
        train_df (pd.DataFrame): The training data.
        val_df (pd.DataFrame, optional): The validation data. If None, will split train_df based on train_ratio.
        train_ratio (float, optional): Proportion of data to use for training. Default is 0.80.
        drop_last (bool, optional): Whether to drop the last incomplete batch. Default is True.
        num_worker (int, optional): Number of workers for data loading. Default is 1.
        batch_size (int, optional): Size of each batch for training. Default is 64.
        pin_memory (bool, optional): \
            If True, the data loader will copy Tensors into CUDA pinned memory. Default is True.

    Returns:
        (float): The training wall time or cost metric based on the training configuration.

    Raises:
        ValueError: If the model instance is not initialized.
    """

    # Check if the model instance is initialized
    if self.model is None:
        raise ValueError("Model instance is empty")

    # Set the checkpoint path for the model
    self.model.checkpoint_path = self.checkpoints

    # Fit the data pipeline using the training DataFrame
    self.model.data_pipeline.fit(train_df.copy())

    # If no validation DataFrame is provided, split the training DataFrame

    if val_df is None and train_ratio > 0.0:
        self.train_df = train_df.copy()
        train_df, val_df = (
            train_df.iloc[: int(train_ratio * len(train_df))],
            train_df.iloc[int(train_ratio * len(train_df)) :],
        )
        self.metric = f"val_{self.model.hparams['metric']}"
        val_feature, val_target = self.model.data_pipeline.transform(val_df)

    else:
        val_feature, val_target = None, None
        self.metric = f"train_{self.model.hparams['metric']}"
        self.train_df = pd.concat([train_df.copy(), val_df.copy()], axis=0)
    # Transform the training data into features and targets
    train_feature, train_target = self.model.data_pipeline.transform(train_df)

    # Initialize the TimeseriesDataModule
    self.datamodule = TimeseriesDataModule(
        train_inputs=train_feature,
        train_targets=train_target,
        val_inputs=val_feature,
        val_targets=val_target,
        drop_last=drop_last,
        num_worker=num_worker,
        batch_size=batch_size,
        pin_memory=pin_memory,
    )

    # Sort the training DataFrame by the specified date column
    self.train_df = self.train_df.sort_values(
        by=self.model.data_pipeline.date_column
    )

    # Select the last 'max_data_drop' rows plus 'input_window_size' rows
    data_drop = self.model.data_pipeline.max_data_drop
    input_window = self.model.data_pipeline.input_window_size

    self.train_df = self.train_df.iloc[-(data_drop + input_window) :]

    # Set up the trainer
    self._set_up_trainer()
    start_time = default_timer()
    logger.info("""---------------Training started ---------------------------""")

    # Train the model and log training duration

    self.trainer.fit(
        self.model,
        self.datamodule.train_dataloader(),
        self.datamodule.val_dataloader(),
        ckpt_path=get_latest_checkpoint(self.checkpoints),
    )

    self.train_walltime = default_timer() - start_time
    logging.info(f"Training complete after {self.train_walltime / 60:.2f} minutes")

    if self.trial is not None:
        # Make predictions and compute the cost metric for hyper-param optimisation
        self.predict(val_df)
        cost = self.metrics.groupby("target")["MAE"].mean().iloc[-1].round(2)

        return cost  # Return the computed cost metric
    else:
        return self.train_walltime

load_and_prepare_data

load_and_prepare_data(
    test_df: DataFrame, daily_feature: str
)

Loads the checkpoint and prepares the ground truth data.

Parameters:

  • test_df (DataFrame) –

    The test DataFrame containing the input features for prediction.

  • daily_feature (str) –

    The daily feature to use in the model.

Returns:

  • ground_truth ( DataFrame ) –

    A DataFrame containing the ground truth data.

Source code in mlpforecast/forecaster/common.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def load_and_prepare_data(self, test_df: pd.DataFrame, daily_feature: str):
    """
    Loads the checkpoint and prepares the ground truth data.

    Args:
        test_df (pd.DataFrame): The test DataFrame containing the input features for prediction.
        daily_feature (str): The daily feature to use in the model.

    Returns:
        ground_truth (pd.DataFrame): A DataFrame containing the ground truth data.
    """
    self.load_checkpoint()
    self.model.data_pipeline.daily_features = daily_feature

    # Prepare ground truth data
    ground_truth = test_df.iloc[self.model.data_pipeline.max_data_drop :].copy()
    ground_truth[self.model.data_pipeline.date_column] = pd.to_numeric(
        ground_truth[self.model.data_pipeline.date_column]
    )
    return ground_truth

perform_prediction

perform_prediction(test_df: DataFrame)

Performs the model prediction.

Parameters:

  • test_df (DataFrame) –

    The test DataFrame containing the input features for prediction.

Returns:

  • dict

    A dictionary containing the forecasted values.

Source code in mlpforecast/forecaster/common.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def perform_prediction(self, test_df: pd.DataFrame):
    """
    Performs the model prediction.

    Args:
        test_df (pd.DataFrame): The test DataFrame containing the input features for prediction.

    Returns:
        (dict): A dictionary containing the forecasted values.

    """
    features, _ = self.model.data_pipeline.transform(test_df.copy())
    features = torch.FloatTensor(features.copy())
    self.model.to(features.device)
    self.model.eval()
    return self.model.forecast(features)

predict

predict(
    test_df=None, covariate_df=None, daily_feature=True
)

Perform prediction on the test DataFrame and return a DataFrame with ground truth and forecasted values.

Parameters:

  • test_df (DataFrame, default: None ) –

    The test DataFrame containing the input features for prediction.

  • daily_feature (bool, default: True ) –

    Flag indicating whether daily features are used in the model. Default is True.

Returns:

  • results_df ( DataFrame ) –

    A DataFrame containing the ground truth and forecasted values, indexed by timestamp.

Source code in mlpforecast/forecaster/common.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
def predict(self, test_df=None, covariate_df=None, daily_feature=True):
    """
    Perform prediction on the test DataFrame and return a DataFrame with ground truth and forecasted values.

    Args:
        test_df (pd.DataFrame): The test DataFrame containing the input features for prediction.
        daily_feature (bool): Flag indicating whether daily features are used in the model. Default is True.

    Returns:
        results_df (pd.DataFrame): A DataFrame containing the ground truth and forecasted values, indexed by timestamp.
    """
    if (test_df is not None) and (self.train_df is not None):
        test_df = pd.concat([self.train_df, test_df], axis=0)
    else:
        assert ValueError("test_df can not be None")
    test_df = test_df.sort_values(by=self.model.data_pipeline.date_column)

    ground_truth = self.load_and_prepare_data(test_df, daily_feature)
    time_stamp = ground_truth[[self.model.data_pipeline.date_column]].values
    ground_truth = ground_truth[self.model.data_pipeline.target_series].values

    time_stamp = format_target(
        time_stamp,
        self.model.hparams["input_window_size"],
        self.model.hparams["forecast_horizon"],
        daily_feature=self.model.data_pipeline.daily_features,
    )
    ground_truth = format_target(
        ground_truth,
        self.model.hparams["input_window_size"],
        self.model.hparams["forecast_horizon"],
        daily_feature=self.model.data_pipeline.daily_features,
    )

    start_time = default_timer()
    pred = self.perform_prediction(test_df)
    self.test_walltime = default_timer() - start_time

    # Inverse transform predictions
    scaler = self.model.data_pipeline.data_pipeline.named_steps["scaling"]
    target_scaler = scaler.named_transformers_["target_scaler"]

    N, T, C = pred["pred"].size()
    pred["pred"] = target_scaler.inverse_transform(
        pred["pred"].numpy().reshape(N * T, C)
    )
    pred["pred"] = pred["pred"].reshape(N, T, C)

    # Evaluate point forecast
    self.metrics = self.evaluate_point_forecast(
        ground_truth, pred["pred"], time_stamp
    )
    self.metrics["test-time"] = self.test_walltime
    self.metrics["Model"] = self.model_type.upper()

    # Assert that the prediction and ground truth shapes are the same
    assert (
        pred["pred"].shape == ground_truth.shape
    ), "Shape mismatch: pred['pred'] and ground_truth must have the same shape."

    # Create results DataFrame
    results_df = self.create_results_df(
        time_stamp,
        ground_truth,
        pred["pred"],
        self.model.data_pipeline.target_series,
        self.model.data_pipeline.date_column,
    )
    results_df["Model"] = self.model_type.upper()

    return results_df

format_target

format_target(
    targets,
    input_window_size,
    forecast_horizon,
    daily_feature=True,
)

Format the target data for training the model.

Parameters:

  • targets (DataFrame) –

    Target data.

  • input_window_size (int) –

    Input window size.

  • forecast_horizon (int) –

    Forecast horizon.

  • daily_feature (bool, default: True ) –

    Whether to use daily features. Defaults to True.

Returns:

  • ndarray

    Formatted target data.

Source code in mlpforecast/forecaster/utils.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def format_target(targets, input_window_size, forecast_horizon, daily_feature=True):
    """
    Format the target data for training the model.

    Args:
        targets (pd.DataFrame): Target data.
        input_window_size (int): Input window size.
        forecast_horizon (int): Forecast horizon.
        daily_feature (bool, optional): Whether to use daily features. Defaults to True.

    Returns:
        (np.ndarray): Formatted target data.
    """
    if daily_feature:
        return extract_daily_sequences(
            targets, input_window_size, forecast_horizon, target_mode=True
        )
    else:
        return extract_target_sequences(targets, input_window_size, forecast_horizon)

get_latest_checkpoint

get_latest_checkpoint(checkpoint_path)

Get the path of the latest checkpoint file.

Parameters:

  • checkpoint_path (str) –

    Path to the directory containing the checkpoint files.

Returns:

  • latest_file ( str ) –

    Path of the latest checkpoint file.

Source code in mlpforecast/forecaster/utils.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def get_latest_checkpoint(checkpoint_path):
    """
    Get the path of the latest checkpoint file.

    Args:
        checkpoint_path (str): Path to the directory containing the checkpoint files.

    Returns:
        latest_file (str): Path of the latest checkpoint file.
    """
    checkpoint_path = str(checkpoint_path)
    list_of_files = glob.glob(checkpoint_path + "/*.ckpt")

    if list_of_files:
        latest_file = max(list_of_files, key=os.path.getctime)
    else:
        latest_file = None
    return latest_file