Production Monitoring#

Source file
  • twiga/serve/monitor.py - ForecastMonitor

Overview#

ForecastMonitor uses Evidently to detect distribution shifts in model inputs and track regression performance once ground truth is available. It is used both inside the FastAPI serving layer (via the /monitor/drift and /monitor/performance endpoints) and inside Prefect flows (via the run_drift_check task).

pip install twiga[mlops]

Quick start#

from twiga.serve import ForecastMonitor

monitor = ForecastMonitor(report_dir="reports/", target_col="Load(MW)")
monitor.set_reference(train_df)

# At inference time  -  check whether input distribution has shifted
summary = monitor.run_data_drift(current_df)
print(summary["drift_detected"], summary["n_drifted_features"])

# Once ground truth is available  -  check model accuracy
perf = monitor.run_performance(df_with_actuals)
print(perf["mae"], perf["rmse"])

Report types#

Data drift#

Compares numeric feature distributions between the reference (training) and current (production) datasets using Evidently’s DataDriftPreset:

summary = monitor.run_data_drift(current_df, feature_cols=["temp", "hour"])

Returns

{
    "drift_detected": True,
    "n_drifted_features": 3,
    "feature_drift": {
        "temperature": 0.031,
        "hour": 0.002,
        "weekday": 0.187,
    },
    "report_path": "/abs/path/to/reports/data_drift_20240601T120000Z.html",
}

Prediction drift#

Detects shifts in the model’s output distribution - useful when ground truth is not yet available but you still want to monitor forecast stability:

summary = monitor.run_prediction_drift(
    current_predictions=current_preds,    # np.ndarray
    reference_predictions=train_preds,    # np.ndarray
)

Both arrays are automatically flattened to 1-D so multi-step forecast arrays (shape (n_samples, horizon, n_targets)) are handled transparently.

Regression performance#

Once ground truth becomes available, compute MAE, RMSE, and MAPE against actuals. Requires the current DataFrame to contain both the target and prediction columns:

perf = monitor.run_performance(
    current_df,
    target_col="Load(MW)",
    prediction_col="pred_Load(MW)",
)
# → {"mae": 4.23, "rmse": 6.11, "mape": 0.034, "report_path": "..."}

HTML reports#

Every report method saves a self-contained HTML file to report_dir with a UTC timestamp suffix:

reports/
├── data_drift_20240601T120000Z.html
├── prediction_drift_20240602T083012Z.html
└── performance_20240603T000000Z.html

The absolute path is returned in report_path so it can be logged to MLflow or exposed via the API response.


API reference#

class twiga.serve.monitor.ForecastMonitor(report_dir='reports', target_col='target', prediction_col='prediction', nan_policy='skip')#

Bases: object

Monitoring suite for deployed forecasting models using Evidently.

This class handles feature data drift, prediction drift, and regression performance monitoring. It automatically saves HTML + JSON reports and logs key metrics to MLflow when an active run exists.

Variables:
  • report_dir – Directory where HTML and JSON reports are saved.

  • target_col – Name of the target column in performance data.

  • prediction_col – Default name for single prediction column.

  • nan_policy – Policy for handling missing values (“drop”, “fill”, “skip”).

__init__(report_dir='reports', target_col='target', prediction_col='prediction', nan_policy='skip')#

Initialize the ForecastMonitor.

Parameters:
  • report_dir (str | Path) – Directory for saving monitoring reports.

  • target_col (str) – Column name for the target variable.

  • prediction_col (str) – Default column name for predictions.

  • nan_policy (Literal['drop', 'fill', 'skip']) – How to handle NaN values in reference/current data.

run_data_drift(current_df, feature_cols=None, categorical_cols=None, time_col=None, drift_threshold=0.5)#

Evaluate distribution shift in input features.

Parameters:
  • current_df (DataFrame) – Current production data.

  • feature_cols (list[str] | None) – Specific features to monitor (if None, uses intersection).

  • categorical_cols (list[str] | None) – Explicit list of categorical columns.

  • time_col (str | None) – Column for optional temporal sorting.

  • drift_threshold (float) – Threshold for alerting on dataset drift score.

Return type:

tuple[bool, dict[str, Any]]

Returns:

Tuple of (is_alert, summary_dict).

run_full_monitoring(current_df, current_predictions, reference_predictions, feature_cols=None, categorical_cols=None, target_col=None, prediction_col=None, drift_threshold=0.5)#

Run data drift, prediction drift, and performance monitoring in one call.

Parameters:
  • current_df (DataFrame) – Current production features and actuals.

  • current_predictions (ndarray | Series | DataFrame) – Model predictions on current data.

  • reference_predictions (ndarray | Series | DataFrame) – Reference (baseline) predictions.

  • feature_cols (list[str] | None) – Features to monitor for data drift.

  • categorical_cols (list[str] | None) – Explicit categorical columns.

  • target_col (str | None) – Override target column.

  • prediction_col (str | None) – Override prediction column.

  • drift_threshold (float) – Alert threshold for both drift types.

Return type:

dict[str, Any]

Returns:

Dictionary containing overall alert flag and detailed results.

run_performance(current_df, target_col=None, prediction_col=None)#

Track regression performance metrics (MAE, RMSE, MAPE).

Parameters:
  • current_df (DataFrame) – DataFrame containing both target and predictions.

  • target_col (str | None) – Override for target column name.

  • prediction_col (str | None) – Override for prediction column name.

Return type:

dict[str, Any]

Returns:

Dictionary with performance metrics and report path.

run_prediction_drift(current_predictions, reference_predictions, prediction_cols=None, drift_threshold=0.5)#

Evaluate shift in model predictions.

Supports single-output and multi-output forecasts.

Parameters:
  • current_predictions (ndarray | Series | DataFrame) – Predictions from the current period.

  • reference_predictions (ndarray | Series | DataFrame) – Predictions from the reference period.

  • prediction_cols (list[str] | None) – Custom column names for multi-output cases.

  • drift_threshold (float) – Threshold for alerting.

Return type:

tuple[bool, dict[str, Any]]

Returns:

Tuple of (is_alert, summary_dict).

set_reference(reference_df)#

Set the reference (baseline) dataset for drift comparison.

Parameters:

reference_df (DataFrame) – DataFrame representing the training or baseline data.

Return type:

None


Integration with the FastAPI app#

ForecastMonitor is wired into the FastAPI app automatically by create_app. The reference dataset is initialised at startup from the forecaster’s data pipeline:

        sequenceDiagram
    participant API as FastAPI
    participant MON as ForecastMonitor
    participant EV  as Evidently

    Note over API,MON: Startup
    API->>MON: set_reference(train_df)

    Note over API,EV: POST /monitor/drift
    API->>MON: run_data_drift(current_df)
    MON->>EV: Report(DataDriftPreset)
    EV-->>MON: drift summary + HTML
    MON-->>API: {"drift_detected": true, …}
    

See Serving for the full endpoint reference.

Integration with Prefect#

In the retraining flow, run_drift_check wraps ForecastMonitor as a Prefect task:

from twiga.pipeline import retraining_flow

result = retraining_flow(
    forecaster=forecaster,
    reference_data_path="data/train.parquet",
    current_data_path="data/current.parquet",
    drift_threshold=0.5,          # retrain if > 50 % of features drift
    performance_threshold=0.15,   # also retrain if MAE > 0.15
)
print(result["drift_summary"])

See Pipeline for the complete retraining flow reference.