Production Monitoring#

Source file
  • twiga/serve/monitor.py - ForecastMonitor

Overview#

ForecastMonitor uses Evidently to detect distribution shifts in model inputs and track regression performance once ground truth is available. It is used both inside the FastAPI serving layer (via the /monitor/drift and /monitor/performance endpoints) and inside Prefect flows (via the run_drift_check task).

pip install twiga[mlops]

Quick start#

from twiga.serve import ForecastMonitor

monitor = ForecastMonitor(report_dir="reports/", target_col="Load(MW)")
monitor.set_reference(train_df)

# At inference time  -  check whether input distribution has shifted
summary = monitor.run_data_drift(current_df)
print(summary["drift_detected"], summary["n_drifted_features"])

# Once ground truth is available  -  check model accuracy
perf = monitor.run_performance(df_with_actuals)
print(perf["mae"], perf["rmse"])

Report types#

Data drift#

Compares numeric feature distributions between the reference (training) and current (production) datasets using Evidently’s DataDriftPreset:

summary = monitor.run_data_drift(current_df, feature_cols=["temp", "hour"])

Returns

{
    "drift_detected": True,
    "n_drifted_features": 3,
    "feature_drift": {
        "temperature": 0.031,
        "hour": 0.002,
        "weekday": 0.187,
    },
    "report_path": "/abs/path/to/reports/data_drift_20240601T120000Z.html",
}

Prediction drift#

Detects shifts in the model’s output distribution - useful when ground truth is not yet available but you still want to monitor forecast stability:

summary = monitor.run_prediction_drift(
    current_predictions=current_preds,    # np.ndarray
    reference_predictions=train_preds,    # np.ndarray
)

Both arrays are automatically flattened to 1-D so multi-step forecast arrays (shape (n_samples, horizon, n_targets)) are handled transparently.

Regression performance#

Once ground truth becomes available, compute MAE, RMSE, and MAPE against actuals. Requires the current DataFrame to contain both the target and prediction columns:

perf = monitor.run_performance(
    current_df,
    target_col="Load(MW)",
    prediction_col="pred_Load(MW)",
)
# → {"mae": 4.23, "rmse": 6.11, "mape": 0.034, "report_path": "..."}

HTML reports#

Every report method saves a self-contained HTML file to report_dir with a UTC timestamp suffix:

reports/
├── data_drift_20240601T120000Z.html
├── prediction_drift_20240602T083012Z.html
└── performance_20240603T000000Z.html

The absolute path is returned in report_path so it can be logged to MLflow or exposed via the API response.


API reference#

class twiga.serve.monitor.ForecastMonitor(report_dir='reports', target_col='target', prediction_col='prediction', nan_policy='skip')#

Bases: object

Monitoring suite for deployed forecasting models using Evidently.

This class handles feature data drift, prediction drift, and regression performance monitoring. It automatically saves HTML + JSON reports and logs key metrics to MLflow when an active run exists.

Variables:
  • report_dir – Directory where HTML and JSON reports are saved.

  • target_col – Name of the target column in performance data.

  • prediction_col – Default name for single prediction column.

  • nan_policy – Policy for handling missing values (“drop”, “fill”, “skip”).

__init__(report_dir='reports', target_col='target', prediction_col='prediction', nan_policy='skip')#

Initialize the ForecastMonitor.

Parameters:
  • report_dir (str | Path) – Directory for saving monitoring reports.

  • target_col (str) – Column name for the target variable.

  • prediction_col (str) – Default column name for predictions.

  • nan_policy (Literal['drop', 'fill', 'skip']) – How to handle NaN values in reference/current data.

run_batch_monitoring(*, current_features_df, current_predictions_df, reference_predictions=None, target_col=None, prediction_col=None, feature_cols=None, drift_threshold=0.5)#

Run the four production-monitoring reports against one window.

Generates data drift, target drift, prediction drift, and regression performance reports — each as its own Evidently HTML/JSON pair — from a single trailing window of captured production traffic.

Parameters:
  • current_features_df (DataFrame) – Captured feature rows in the window. Drives data drift and (if the target column is present) target drift.

  • current_predictions_df (DataFrame | None) – Joined timestamp × target × prediction rows. Drives prediction drift and performance when an actual column is also present.

  • reference_predictions (ndarray | Series | DataFrame | None) – Reference predictions to compare against for prediction drift. When None, prediction drift is skipped.

  • target_col (str | None) – Override the target column name.

  • prediction_col (str | None) – Override the prediction column name.

  • feature_cols (list[str] | None) – Restrict data drift to these features. If None, uses the intersection of reference and current columns.

  • drift_threshold (float) – Alert threshold for the drift flag.

Return type:

dict[str, Any]

Returns:

Dict with per-section results, report paths, and an overall alert.

run_data_drift(current_df, feature_cols=None, categorical_cols=None, time_col=None, drift_threshold=0.5)#

Evaluate distribution shift in input features.

Parameters:
  • current_df (DataFrame) – Current production data.

  • feature_cols (list[str] | None) – Specific features to monitor (if None, uses intersection).

  • categorical_cols (list[str] | None) – Explicit list of categorical columns.

  • time_col (str | None) – Column for optional temporal sorting.

  • drift_threshold (float) – Threshold for alerting on dataset drift score.

Return type:

tuple[bool, dict[str, Any]]

Returns:

Tuple of (is_alert, summary_dict).

run_forecast_performance(result, metric_names=None, **evaluate_kwargs)#

Evaluate forecast quality using Twiga-native metrics for any forecast kind.

Dispatches to the correct evaluation function based on result.kind, aggregates per-day / per-target metrics to scalars, and logs them to MLflow when an active run exists.

Supported kinds and the metrics available for each:

  • "point" — mae, rmse, mape, wmape, smape, corr, nbias, r2, …

  • "interval" — picp, mwi, winkler, msis, nmpi, cwe, mae, rmse, …

  • "quantile" — pinball, wql, crps, calibration_error, sharpness, …

  • "parametric" / "samples" — nll, crps, dss, energy_score, …

Parameters:
  • result (ForecastResult) – A ForecastResult with ground_truth set. The kind attribute determines which metrics are computed.

  • metric_names (list[str] | None) – Names of metrics to compute. None uses all defaults for the forecast kind. Pass forecaster.metrics here to use the forecaster’s configured metric list.

  • **evaluate_kwargs (Any) – Extra keyword arguments forwarded to the underlying evaluate function, e.g. alpha=0.1 for interval/quantile forecasts, spread="iqr" for interval.

Return type:

dict[str, Any]

Returns:

Dict containing

  • One scalar value per metric (mean across days and targets).

  • "kind" — forecast kind string.

  • "timestamp" — ISO-8601 UTC timestamp.

  • "per_target"{target_name: {metric: value}} for multi-target inspection.

Raises:

Example:

result = forecaster.predict(test_df)
summary = monitor.run_forecast_performance(result, metric_names=forecaster.metrics or None)
print(summary["mae"], summary["kind"])
run_full_monitoring(current_df, current_predictions, reference_predictions, feature_cols=None, categorical_cols=None, target_col=None, prediction_col=None, drift_threshold=0.5)#

Run data drift, prediction drift, and performance monitoring in one call.

Parameters:
  • current_df (DataFrame) – Current production features and actuals.

  • current_predictions (ndarray | Series | DataFrame) – Model predictions on current data.

  • reference_predictions (ndarray | Series | DataFrame) – Reference (baseline) predictions.

  • feature_cols (list[str] | None) – Features to monitor for data drift.

  • categorical_cols (list[str] | None) – Explicit categorical columns.

  • target_col (str | None) – Override target column.

  • prediction_col (str | None) – Override prediction column.

  • drift_threshold (float) – Alert threshold for both drift types.

Return type:

dict[str, Any]

Returns:

Dictionary containing overall alert flag and detailed results.

run_performance(current_df, target_col=None, prediction_col=None)#

Track regression performance metrics (MAE, RMSE, MAPE).

Parameters:
  • current_df (DataFrame) – DataFrame containing both target and predictions.

  • target_col (str | None) – Override for target column name.

  • prediction_col (str | None) – Override for prediction column name.

Return type:

dict[str, Any]

Returns:

Dictionary with performance metrics and report path.

run_prediction_drift(current_predictions, reference_predictions, prediction_cols=None, drift_threshold=0.5)#

Evaluate shift in model predictions.

Supports single-output and multi-output forecasts.

Parameters:
  • current_predictions (ndarray | Series | DataFrame) – Predictions from the current period.

  • reference_predictions (ndarray | Series | DataFrame) – Predictions from the reference period.

  • prediction_cols (list[str] | None) – Custom column names for multi-output cases.

  • drift_threshold (float) – Threshold for alerting.

Return type:

tuple[bool, dict[str, Any]]

Returns:

Tuple of (is_alert, summary_dict).

run_target_drift(current_df, target_col=None, drift_threshold=0.5)#

Evaluate distribution shift of the target between reference and current.

Thin wrapper around DataDriftPreset restricted to a single column — the target — so the result is reported as “target drift” with its own report file rather than buried inside the full feature-drift report.

Parameters:
  • current_df (DataFrame) – Current production data, must contain the target column.

  • target_col (str | None) – Override the target column name. Defaults to self.target_col.

  • drift_threshold (float) – Alert threshold on the binary drift flag.

Return type:

tuple[bool, dict[str, Any]]

Returns:

Tuple of (is_alert, summary_dict).

set_reference(reference_df)#

Set the reference (baseline) dataset for drift comparison.

Parameters:

reference_df (DataFrame) – DataFrame representing the training or baseline data.

Return type:

None


Integration with the FastAPI app#

ForecastMonitor is wired into the FastAPI app automatically by create_app. The reference dataset is initialised at startup from the forecaster’s data pipeline:

        sequenceDiagram
    participant API as FastAPI
    participant MON as ForecastMonitor
    participant EV  as Evidently

    Note over API,MON: Startup
    API->>MON: set_reference(train_df)

    Note over API,EV: POST /monitor/drift
    API->>MON: run_data_drift(current_df)
    MON->>EV: Report(DataDriftPreset)
    EV-->>MON: drift summary + HTML
    MON-->>API: {"drift_detected": true, …}
    

See Serving for the full endpoint reference.

Integration with Prefect#

In the retraining flow, run_drift_check wraps ForecastMonitor as a Prefect task:

from twiga.pipeline import retraining_flow

result = retraining_flow(
    forecaster=forecaster,
    reference_data_path="data/train.parquet",
    current_data_path="data/current.parquet",
    drift_threshold=0.5,          # retrain if > 50 % of features drift
    performance_threshold=0.15,   # also retrain if MAE > 0.15
)
print(result["drift_summary"])

See Pipeline for the complete retraining flow reference.