Production Monitoring#
Source file
twiga/serve/monitor.py-ForecastMonitor
Overview#
ForecastMonitor uses Evidently to detect
distribution shifts in model inputs and track regression performance once
ground truth is available. It is used both inside the FastAPI serving layer
(via the /monitor/drift and /monitor/performance endpoints) and inside
Prefect flows (via the run_drift_check task).
pip install twiga[mlops]
Quick start#
from twiga.serve import ForecastMonitor
monitor = ForecastMonitor(report_dir="reports/", target_col="Load(MW)")
monitor.set_reference(train_df)
# At inference time - check whether input distribution has shifted
summary = monitor.run_data_drift(current_df)
print(summary["drift_detected"], summary["n_drifted_features"])
# Once ground truth is available - check model accuracy
perf = monitor.run_performance(df_with_actuals)
print(perf["mae"], perf["rmse"])
Report types#
Data drift#
Compares numeric feature distributions between the reference (training) and
current (production) datasets using Evidently’s DataDriftPreset:
summary = monitor.run_data_drift(current_df, feature_cols=["temp", "hour"])
Returns
{
"drift_detected": True,
"n_drifted_features": 3,
"feature_drift": {
"temperature": 0.031,
"hour": 0.002,
"weekday": 0.187,
},
"report_path": "/abs/path/to/reports/data_drift_20240601T120000Z.html",
}
Prediction drift#
Detects shifts in the model’s output distribution - useful when ground truth is not yet available but you still want to monitor forecast stability:
summary = monitor.run_prediction_drift(
current_predictions=current_preds, # np.ndarray
reference_predictions=train_preds, # np.ndarray
)
Both arrays are automatically flattened to 1-D so multi-step forecast arrays
(shape (n_samples, horizon, n_targets)) are handled transparently.
Regression performance#
Once ground truth becomes available, compute MAE, RMSE, and MAPE against actuals. Requires the current DataFrame to contain both the target and prediction columns:
perf = monitor.run_performance(
current_df,
target_col="Load(MW)",
prediction_col="pred_Load(MW)",
)
# → {"mae": 4.23, "rmse": 6.11, "mape": 0.034, "report_path": "..."}
HTML reports#
Every report method saves a self-contained HTML file to report_dir with a
UTC timestamp suffix:
reports/
├── data_drift_20240601T120000Z.html
├── prediction_drift_20240602T083012Z.html
└── performance_20240603T000000Z.html
The absolute path is returned in report_path so it can be logged to MLflow
or exposed via the API response.
API reference#
- class twiga.serve.monitor.ForecastMonitor(report_dir='reports', target_col='target', prediction_col='prediction', nan_policy='skip')#
Bases:
objectMonitoring suite for deployed forecasting models using Evidently.
This class handles feature data drift, prediction drift, and regression performance monitoring. It automatically saves HTML + JSON reports and logs key metrics to MLflow when an active run exists.
- Variables:
report_dir – Directory where HTML and JSON reports are saved.
target_col – Name of the target column in performance data.
prediction_col – Default name for single prediction column.
nan_policy – Policy for handling missing values (“drop”, “fill”, “skip”).
- __init__(report_dir='reports', target_col='target', prediction_col='prediction', nan_policy='skip')#
Initialize the ForecastMonitor.
- run_data_drift(current_df, feature_cols=None, categorical_cols=None, time_col=None, drift_threshold=0.5)#
Evaluate distribution shift in input features.
- Parameters:
current_df (
DataFrame) – Current production data.feature_cols (
list[str] |None) – Specific features to monitor (if None, uses intersection).categorical_cols (
list[str] |None) – Explicit list of categorical columns.time_col (
str|None) – Column for optional temporal sorting.drift_threshold (
float) – Threshold for alerting on dataset drift score.
- Return type:
- Returns:
Tuple of (is_alert, summary_dict).
- run_full_monitoring(current_df, current_predictions, reference_predictions, feature_cols=None, categorical_cols=None, target_col=None, prediction_col=None, drift_threshold=0.5)#
Run data drift, prediction drift, and performance monitoring in one call.
- Parameters:
current_df (
DataFrame) – Current production features and actuals.current_predictions (
ndarray|Series|DataFrame) – Model predictions on current data.reference_predictions (
ndarray|Series|DataFrame) – Reference (baseline) predictions.feature_cols (
list[str] |None) – Features to monitor for data drift.categorical_cols (
list[str] |None) – Explicit categorical columns.drift_threshold (
float) – Alert threshold for both drift types.
- Return type:
- Returns:
Dictionary containing overall alert flag and detailed results.
- run_performance(current_df, target_col=None, prediction_col=None)#
Track regression performance metrics (MAE, RMSE, MAPE).
- Parameters:
- Return type:
- Returns:
Dictionary with performance metrics and report path.
- run_prediction_drift(current_predictions, reference_predictions, prediction_cols=None, drift_threshold=0.5)#
Evaluate shift in model predictions.
Supports single-output and multi-output forecasts.
- Parameters:
current_predictions (
ndarray|Series|DataFrame) – Predictions from the current period.reference_predictions (
ndarray|Series|DataFrame) – Predictions from the reference period.prediction_cols (
list[str] |None) – Custom column names for multi-output cases.drift_threshold (
float) – Threshold for alerting.
- Return type:
- Returns:
Tuple of (is_alert, summary_dict).
Integration with the FastAPI app#
ForecastMonitor is wired into the FastAPI app automatically by create_app.
The reference dataset is initialised at startup from the forecaster’s data
pipeline:
sequenceDiagram
participant API as FastAPI
participant MON as ForecastMonitor
participant EV as Evidently
Note over API,MON: Startup
API->>MON: set_reference(train_df)
Note over API,EV: POST /monitor/drift
API->>MON: run_data_drift(current_df)
MON->>EV: Report(DataDriftPreset)
EV-->>MON: drift summary + HTML
MON-->>API: {"drift_detected": true, …}
See Serving for the full endpoint reference.
Integration with Prefect#
In the retraining flow, run_drift_check wraps ForecastMonitor as a
Prefect task:
from twiga.pipeline import retraining_flow
result = retraining_flow(
forecaster=forecaster,
reference_data_path="data/train.parquet",
current_data_path="data/current.parquet",
drift_threshold=0.5, # retrain if > 50 % of features drift
performance_threshold=0.15, # also retrain if MAE > 0.15
)
print(result["drift_summary"])
See Pipeline for the complete retraining flow reference.