MLOps: Tracking, Serving & Monitoring#
What you’ll build
A complete MLOps pipeline around the LightGBM net-load forecaster from Tutorial 01: from raw experiment tracking all the way through drift-triggered retraining and hot-model-reload in a FastAPI server.
Prerequisites
Tutorial 01 (Getting Started) - you should know how to configure and fit a
TwigaForecasterBasic HTTP / REST concepts (GET, POST, JSON)
Learning objectives
By the end of this notebook you will be able to:
Track experiments with
TwigaTracker(dataset lineage, params, metrics, artefacts)Save and load versioned checkpoints via
on_save_checkpoint/on_load_checkpointDetect data drift and monitor forecast performance with
ForecastMonitorServe forecasts over HTTP with
create_app()(FastAPI) and call all endpointsRun Prefect tasks directly via
.fn()for debugging and testingUnderstand
training_flow/retraining_flowand read their typed return valuesConfigure the MLOps layer via
TwigaSettingsenvironment variables
The MLOps lifecycle
Fit model → Track (MLflow) → Checkpoint → Serve (FastAPI)
↓
Retraining ← Monitor (Evidently)
All heavy dependencies are part of the [mlops] extra:
pip install twiga[mlops]
1. Setup#
from pathlib import Path
from great_tables import GT, md
from IPython.display import clear_output
from lets_plot import LetsPlot
import pandas as pd
from sklearn.preprocessing import RobustScaler, StandardScaler
LetsPlot.setup_html()
# warnings.filterwarnings("ignore")
from twiga import TwigaForecaster
from twiga.core.config import DataPipelineConfig, ForecasterConfig
from twiga.core.plot import plot_timeseries
from twiga.core.plot.gt import twiga_gt, twiga_report
from twiga.core.utils import configure, get_logger
from twiga.models.baseline.seasonal_naive_model import SEASONALNAIVEConfig
from twiga.models.ml import LIGHTGBMConfig
configure()
log = get_logger("tutorial")
# Directories used throughout this notebook
CHECKPOINT_DIR = Path("mlops_demo/checkpoints")
REPORT_DIR = Path("mlops_demo/reports")
DATA_DIR = Path("mlops_demo/data")
for d in [CHECKPOINT_DIR, REPORT_DIR, DATA_DIR]:
d.mkdir(parents=True, exist_ok=True)
log.info("Setup complete. Directories: %s", [str(d) for d in [CHECKPOINT_DIR, REPORT_DIR, DATA_DIR]])
2026-05-03 16:46:17 | INFO | twiga.tutorial | Setup complete. Directories: ['mlops_demo/checkpoints', 'mlops_demo/reports', 'mlops_demo/data']
2. Load data#
We use the same MLVS-PT dataset as Tutorial 01 - Madeira, Portugal at 30-minute resolution. The splits mirror Tutorial 01 so you can compare results directly.
Split |
Period |
Purpose |
|---|---|---|
Train |
2019-02-01 → 2020-12-31 |
Model learning |
Validation |
2020-01-01 → 2020-06-30 |
Early-stopping guard |
Test |
2020-07-01 → 2020-12-31 |
Final honest evaluation |
Why the same dataset? MLOps tooling is independent of the model - the same
TwigaTracker,ForecastMonitor, and pipeline flows work with anyTwigaForecaster. Using a familiar dataset lets you focus on the MLOps concepts without new data concerns.
data = pd.read_parquet("../data/MLVS-PT.parquet")
data = data[["timestamp", "NetLoad(kW)", "Ghi", "Temperature"]]
data["timestamp"] = pd.to_datetime(data["timestamp"])
data = data.drop_duplicates(subset="timestamp").reset_index(drop=True)
# Restrict to 2019-2020 to keep tutorial execution fast
data = data[(data["timestamp"] >= "2019-01-01") & (data["timestamp"] <= "2020-12-31")].reset_index(drop=True)
train_df = data[data["timestamp"] < "2020-01-01"].reset_index(drop=True)
val_df = data[(data["timestamp"] >= "2020-01-01") & (data["timestamp"] < "2020-07-01")].reset_index(drop=True)
test_df = data[data["timestamp"] >= "2020-07-01"].reset_index(drop=True)
log.info(
"train : %d rows (%s → %s)", len(train_df), train_df["timestamp"].min().date(), train_df["timestamp"].max().date()
)
log.info("val : %d rows (%s → %s)", len(val_df), val_df["timestamp"].min().date(), val_df["timestamp"].max().date())
log.info(
"test : %d rows (%s → %s)", len(test_df), test_df["timestamp"].min().date(), test_df["timestamp"].max().date()
)
# Persist splits so pipeline tasks can read from disk
train_df.to_parquet(DATA_DIR / "train.parquet", index=False)
test_df.to_parquet(DATA_DIR / "test.parquet", index=False)
log.info("Splits saved to %s", DATA_DIR)
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Cell In[2], line 1
----> 1 data = pd.read_parquet("../data/MLVS-PT.parquet")
2 data = data[["timestamp", "NetLoad(kW)", "Ghi", "Temperature"]]
3 data["timestamp"] = pd.to_datetime(data["timestamp"])
4 data = data.drop_duplicates(subset="timestamp").reset_index(drop=True)
File ~/work/twiga-forecast/twiga-forecast/.venv/lib/python3.12/site-packages/pandas/io/parquet.py:669, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
666 use_nullable_dtypes = False
667 check_dtype_backend(dtype_backend)
--> 669 return impl.read(
670 path,
671 columns=columns,
672 filters=filters,
673 storage_options=storage_options,
674 use_nullable_dtypes=use_nullable_dtypes,
675 dtype_backend=dtype_backend,
676 filesystem=filesystem,
677 **kwargs,
678 )
File ~/work/twiga-forecast/twiga-forecast/.venv/lib/python3.12/site-packages/pandas/io/parquet.py:258, in PyArrowImpl.read(self, path, columns, filters, use_nullable_dtypes, dtype_backend, storage_options, filesystem, **kwargs)
256 if manager == "array":
257 to_pandas_kwargs["split_blocks"] = True
--> 258 path_or_handle, handles, filesystem = _get_path_or_handle(
259 path,
260 filesystem,
261 storage_options=storage_options,
262 mode="rb",
263 )
264 try:
265 pa_table = self.api.parquet.read_table(
266 path_or_handle,
267 columns=columns,
(...) 270 **kwargs,
271 )
File ~/work/twiga-forecast/twiga-forecast/.venv/lib/python3.12/site-packages/pandas/io/parquet.py:141, in _get_path_or_handle(path, fs, storage_options, mode, is_dir)
131 handles = None
132 if (
133 not fs
134 and not is_dir
(...) 139 # fsspec resources can also point to directories
140 # this branch is used for example when reading from non-fsspec URLs
--> 141 handles = get_handle(
142 path_or_handle, mode, is_text=False, storage_options=storage_options
143 )
144 fs = None
145 path_or_handle = handles.handle
File ~/work/twiga-forecast/twiga-forecast/.venv/lib/python3.12/site-packages/pandas/io/common.py:882, in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options)
873 handle = open(
874 handle,
875 ioargs.mode,
(...) 878 newline="",
879 )
880 else:
881 # Binary mode
--> 882 handle = open(handle, ioargs.mode)
883 handles.append(handle)
885 # Convert BytesIO or file objects passed with an encoding
FileNotFoundError: [Errno 2] No such file or directory: '../data/MLVS-PT.parquet'
3. Configure and fit the forecaster#
The configuration mirrors Tutorial 01. ForecasterConfig.checkpoints_path tells
the forecaster where to write the model artefact - this is the directory that
ForecastMonitor, ModelLoader, and retraining_flow all read from.
data_config = DataPipelineConfig(
target_feature="NetLoad(kW)",
period="30min",
latitude=32.371666,
longitude=-16.274998,
calendar_features=["hour", "day_night"],
exogenous_features=["Ghi", "Temperature"],
forecast_horizon=48, # 24 h at 30-min resolution
lookback_window_size=48 * 7, # 48 h of history
input_scaler=StandardScaler(),
target_scaler=RobustScaler(),
stride=48, # non-overlapping windows (48 steps = 24 h at 30-min resolution
)
forecaster_config = ForecasterConfig(
split_freq="months",
train_size=3,
test_size=1,
checkpoints_path=str(CHECKPOINT_DIR), # where on_save_checkpoint writes
)
model_config = LIGHTGBMConfig()
seasonal_config = SEASONALNAIVEConfig(period="7D", freq="30min")
forecaster = TwigaForecaster(
data_params=data_config,
model_params=[seasonal_config],
train_params=forecaster_config,
)
log.info("Forecaster assembled — checkpoint dir: %s", forecaster.checkpoints_path)
4. Experiment tracking with TwigaTracker#
TwigaTracker is a context manager around an MLflow run. Inside the with block,
five methods cover the full tracking workflow:
Step |
Method |
What it logs |
|---|---|---|
1 |
|
Dataset name, shape, and source URI |
2 |
|
All Pydantic config fields as flat MLflow params |
3 |
|
Checkpoint directory as an MLflow artefact (with conda env) |
4 |
|
Per-fold metrics and predictions DataFrame |
|
property |
The MLflow run ID for downstream steps |
MLflow UI After running the cell, start
mlflow ui --port 5000in a terminal and openhttp://localhost:5000to inspect the logged artefacts.
from twiga.tracking import TwigaTracker
with TwigaTracker(
experiment="tutorial-seasonal",
run_name="seasonal-naive",
tags={"dataset": "MLVS-PT", "horizon": "48-step", "tutorial": "17"},
) as tracker:
# Step 1 — dataset lineage
tracker.log_dataset(
train_df,
name="MLVS-PT-train",
source="../data/MLVS-PT.parquet",
)
forecaster.fit(train_df=train_df, val_df=val_df)
clear_output()
log.info("Training complete.")
# Step 2 — hyperparameters and configs
tracker.log_forecaster_metadata(forecaster)
# Step 3 — model artefact
tracker.log_model(forecaster, sample_input=train_df.head(500))
# Step 4 — evaluation
predictions_df, metrics_df = forecaster.evaluate_point_forecast(test_df=test_df, ensemble_strategy="mean")
tracker.log_evaluation(metrics_df, results_df=predictions_df)
mlflow_run_id = tracker.run_id
log.info("MLflow run: %s", mlflow_run_id)
#!mlflow ui --port 5000
Inspect the metrics#
res = (
metrics_df.groupby("Model")[["mae", "corr", "nbias", "rmse", "wmape", "smape"]]
.mean()
.round(2)
.reset_index()
.rename(
columns={"mae": "MAE", "corr": "Corr", "wmape": "WMAPE", "smape": "SMAPE", "nbias": "NBIAS", "rmse": "RMSE"}
)
)
twiga_report(
res,
metrics=["MAE", "Corr", "SMAPE", "RMSE"],
minimize_cols=["MAE", "SMAPE", "RMSE"],
maximize_cols=["Corr"],
)
5. Versioned checkpoints#
on_save_checkpoint() persists the fitted model to disk and writes (or updates)
_checkpoint_manifest.json - a small JSON file that records the version number,
model type, and UTC timestamp of every save.
{
"version": 1,
"checkpoint_file": "model_and_pipeline.pkl",
"model_type": "lightgbm",
"saved_at": "2024-06-01T12:00:00+00:00"
}
on_load_checkpoint() reads the manifest first; if absent it falls back to the
most-recently-modified *.pkl file for backward compatibility. Using the manifest
makes rollbacks auditable and removes the fragility of mtime-sorting.
import json
# Persist the trained model
checkpoint_path = forecaster.on_save_checkpoint()
log.info("Checkpoint saved: %s", checkpoint_path)
# Inspect the manifest
manifest_path = Path(checkpoint_path) / "_checkpoint_manifest.json"
if manifest_path.exists():
manifest = json.loads(manifest_path.read_text())
log.info(
"Manifest: version=%d model_type=%s saved_at=%s",
manifest["version"],
manifest["model_type"],
manifest["saved_at"],
)
else:
log.warning("Manifest not found at %s", manifest_path)
# Create a fresh (unfitted) forecaster and load the checkpoint
loaded_forecaster = TwigaForecaster(
data_params=data_config,
model_params=[model_config],
train_params=forecaster_config,
)
loaded_forecaster.on_load_checkpoint()
log.info("Checkpoint loaded — model ready: %s", loaded_forecaster.model is not None)
6. Production monitoring with ForecastMonitor#
ForecastMonitor wraps Evidently to run two types
of production checks:
Method |
What it checks |
Trigger |
|---|---|---|
|
Distribution shift in input features |
Weekly / on new data |
|
MAE / RMSE degradation vs reference |
After each prediction batch |
Both methods write an HTML report to report_dir and return a summary dict that
retraining_flow uses to decide whether to retrain.
Workflow#
monitor.set_reference(train_df) # once, at deployment time
↓
monitor.run_data_drift(current_df) # drift_detected? → retrain
monitor.run_performance(actuals_df) # mae / rmse degraded? → retrain
from twiga.serve import ForecastMonitor
monitor = ForecastMonitor(
report_dir=str(REPORT_DIR),
target_col="NetLoad(kW)",
prediction_col="pred_NetLoad",
)
monitor.set_reference(train_df)
log.info("Reference set — %d rows from training split.", len(train_df))
import numpy as np
rng = np.random.default_rng(42)
# Simulate a production window with a distribution shift in Temperature
# (e.g. a heat wave that pushes temperatures +4 °C above the historical baseline)
current_df = test_df.copy()
current_df["Temperature"] = current_df["Temperature"] + rng.normal(4, 1.5, len(current_df))
_, drift_summary = monitor.run_data_drift(
current_df,
feature_cols=["Temperature", "Ghi"],
)
log.info("Drift detected : %s", drift_summary["drift_detected"])
log.info("Dataset score : %.3f", drift_summary["dataset_drift_score"])
log.info("Drifted features : %d / %d", drift_summary["n_drifted"], drift_summary["total"])
for feat, score in drift_summary.get("feature_drift", {}).items():
log.info(" %-15s p-value = %.4f", feat, score)
# Performance monitoring — add a prediction column to a small batch
# We use the loaded_forecaster from section 5 to show round-trip checkpoint → predict
batch = test_df.head(1000).copy()
preds_arr, _ = loaded_forecaster.predict(batch)
# preds_arr values are 3-D arrays: (n_windows, horizon, n_targets).
# Each window i predicts `horizon` steps starting at row `lookback + i * stride`
# in the original batch. We take the first horizon step of every window and
# align each prediction to its matching batch row.
first_model_key = list(preds_arr.keys())[0]
pred = preds_arr[first_model_key] # (n_windows, horizon, 1)
lookback = data_config.lookback_window_size
stride = data_config.stride
n_windows = pred.shape[0]
pred_row_indices = [lookback + i * stride for i in range(n_windows)]
monitor_batch = batch.iloc[pred_row_indices].copy()
monitor_batch["pred_NetLoad"] = pred[:, 0, 0]
perf_summary = monitor.run_performance(monitor_batch)
log.info("MAE : %.4f kW", perf_summary.get("mae", float("nan")))
log.info("RMSE : %.4f kW", perf_summary.get("rmse", float("nan")))
log.info("Report → %s", perf_summary.get("report_path", "n/a"))
7. Model serving with FastAPI#
create_app() is a factory that returns a configured FastAPI application.
It wires together:
ModelLoader: thread-safe lazy checkpoint loader with double-checked lockingForecastMonitor: Evidently drift and performance checks exposed as HTTP endpointsCPU offloading:
fc.predict()runs in a thread pool viaanyio.to_thread.run_syncso the async event loop stays responsive under load
Endpoints#
Method |
Path |
Description |
|---|---|---|
GET |
|
Model names, horizon, targets, load status |
POST |
|
Point forecast from raw records |
POST |
|
Interval forecast (requires probabilistic model) |
POST |
|
Run data drift report on the request payload |
GET |
|
Latest performance summary |
POST |
|
Hot-reload model from latest checkpoint |
The cells below use httpx.AsyncClient with the app mounted in-process
(no actual server needed).
from twiga.serve import create_app
app = create_app(
forecaster=forecaster,
report_dir=str(REPORT_DIR),
title="Net Load Forecast API — Tutorial 17",
)
log.info("FastAPI app created — routes: %s", [r.path for r in app.routes])
from fastapi.testclient import TestClient
client = TestClient(app)
# Health check — confirms the model is loaded and lists capabilities
resp = client.get("/health")
health = resp.json()
twiga_gt(
GT(
pd.DataFrame(
[
{"Field": "status", "Value": health.get("status")},
{"Field": "models", "Value": str(health.get("models"))},
{"Field": "forecast_horizon", "Value": str(health.get("forecast_horizon"))},
{"Field": "targets", "Value": str(health.get("targets"))},
]
)
)
.tab_header(title=md("**GET /health**"), subtitle="API capability report")
.cols_label(Field=md("**Field**"), Value=md("**Value**")),
n_rows=4,
)
# Point forecast — POST /predict
# Convert timestamps to ISO strings so JSON serialisation works
batch_records = (
test_df.head(1000)
.assign(timestamp=test_df.head(1000)["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S"))
.to_dict("records")
)
resp = client.post("/predict", json={"records": batch_records, "ensemble_strategy": "mean"})
resp.raise_for_status()
forecast_resp = resp.json()
log.info("Response keys : %s", list(forecast_resp.keys()))
log.info("Models returned: %d", len(forecast_resp["forecasts"]))
for f in forecast_resp["forecasts"]:
log.info(" %-20s predictions shape: %s", f["model"], str(len(f["predictions"])))
# Data drift via API — POST /monitor/drift
drift_records = (
current_df.head(500)
.assign(timestamp=current_df.head(500)["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S"))
.to_dict("records")
)
resp = client.post("/monitor/drift", json={"records": drift_records})
drift_api = resp.json()
log.info("drift_detected : %s", drift_api.get("drift_detected"))
log.info("dataset_drift_score: %.3f", drift_api.get("dataset_drift_score", 0))
log.info("n_drifted : %d", drift_api.get("n_drifted", 0))
# Hot reload — POST /reload
# In production, retraining_flow calls this after promoting a new model.
# Here we trigger it manually to verify the endpoint works.
resp = client.post("/reload")
log.info("Reload status: %d body: %s", resp.status_code, resp.json())
Running in production#
To serve real traffic, write the app to a module and start uvicorn:
# serve.py
from twiga import TwigaForecaster
from twiga.core.config import DataPipelineConfig, ForecasterConfig
from twiga.serve import create_app
from sklearn.preprocessing import RobustScaler, StandardScaler
data_config = DataPipelineConfig(
target_feature="NetLoad(kW)",
period="30min",
latitude=32.371666,
longitude=-16.274998,
calendar_features=["hour", "day_night"],
exogenous_features=["Ghi", "Temperature"],
forecast_horizon=48,
lookback_window_size=96,
input_scaler=StandardScaler(),
target_scaler=RobustScaler(),
)
train_params = ForecasterConfig(
split_freq="months", train_size=3, test_size=1,
checkpoints_path="mlops_demo/checkpoints",
)
from twiga.models.ml import LIGHTGBMConfig
forecaster = TwigaForecaster(
data_params=data_config, model_params=[LIGHTGBMConfig()], train_params=train_params
)
app = create_app(forecaster, report_dir="mlops_demo/reports", title="Net Load Forecast API")
uvicorn serve:app --host 0.0.0.0 --port 8000 --workers 2
Secure the API by setting TWIGA_SERVE_API_KEY - see Section 10.
8. Pipeline tasks: direct invocation#
Every Prefect task in twiga.pipeline.tasks exposes a .fn() method that runs the
underlying Python function without a running Prefect server. This is ideal for:
Debugging a single step without triggering the full flow
Running tasks inside notebooks or unit tests
Composing ad-hoc pipelines outside of Prefect
Convention:
task.fn(...)bypasses Prefect’s retry logic, logging, and state tracking - it is the raw Python function.
import logging
from unittest.mock import patch
plain_logger = logging.getLogger("tutorial")
# Patch get_run_logger so tasks work without a running Prefect server
with patch("twiga.pipeline.tasks.get_run_logger", return_value=plain_logger):
from twiga.pipeline.tasks import (
evaluate_model,
fit_forecaster,
load_data,
run_drift_check,
save_checkpoint,
)
# 1. load_data — read parquet, parse timestamps, split train/test
tr, te = load_data.fn(
data_path=DATA_DIR / "train.parquet",
date_col="timestamp",
)
log.info("load_data → train %d rows | test %d rows", len(tr), len(te))
# 2. fit_forecaster
fresh_forecaster = TwigaForecaster(
data_params=data_config,
model_params=[model_config],
train_params=forecaster_config,
)
fitted = fit_forecaster.fn(forecaster=fresh_forecaster, train_df=tr, tune=False)
clear_output()
log.info("fit_forecaster → model fitted: %s", fitted.model is not None)
# 3. evaluate_model
preds_df, met_df = evaluate_model.fn(forecaster=fitted, test_df=te)
log.info("evaluate_model → metrics: %s", met_df[["mae", "rmse"]].mean().to_dict())
# 4. save_checkpoint
ckpt = save_checkpoint.fn(forecaster=fitted)
log.info("save_checkpoint → %s", ckpt)
# 5. run_drift_check (reference = train, current = shifted test)
drift_raw = run_drift_check.fn(
reference_df=tr,
current_df=current_df.head(len(tr)),
report_dir=str(REPORT_DIR),
)
log.info("run_drift_check → detected=%s score=%.3f", drift_raw["drift_detected"], drift_raw["dataset_drift_score"])
9. Full Prefect flows#
In production you schedule training_flow and retraining_flow as Prefect deployments.
Both flows return typed dataclasses - use attribute access, not dict keys.
training_flow: full train → evaluate → checkpoint → MLflow#
from twiga.pipeline import training_flow
from twiga.pipeline.schemas import TrainingResult
result: TrainingResult = training_flow(
forecaster=forecaster,
data_path="mlops_demo/data/train.parquet",
date_col="timestamp",
train_end="2020-12-31",
test_start="2020-01-01",
tune=False,
experiment="tutorial-17",
run_name="lgbm-flow-v1",
)
# Attribute access - not result["checkpoint_path"]
print(result.checkpoint_path) # "/path/to/checkpoints/"
print(result.mlflow_run_id) # "a1b2c3d4e5f6"
print(result.metrics) # {"lightgbm": {"mae": 4.23, "rmse": 6.11, "corr": 0.97}}
retraining_flow: drift-triggered retrain with promotion logic#
from twiga.pipeline import retraining_flow
from twiga.pipeline.schemas import RetrainingResult, DriftSummary
result: RetrainingResult = retraining_flow(
forecaster=forecaster,
reference_data_path="mlops_demo/data/train.parquet",
current_data_path="mlops_demo/data/test.parquet",
drift_threshold=0.5, # retrain if > 50% of features drifted
performance_threshold=None, # drift check only in this example
metric_key="mae",
minimize=True,
experiment="tutorial-17-retrain",
run_name="lgbm-retrain-v1",
fastapi_reload_url="http://localhost:8000", # POST /reload after promotion
)
# Typed return - no dict-key guesswork
print(result.retrained) # True
print(result.promoted) # True
print(result.checkpoint_path) # "/path/to/checkpoints/"
print(result.mlflow_run_id) # "f7e8d9c0b1a2"
print(result.metrics) # {"lightgbm": {"mae": 3.91, ...}}
# DriftSummary - rich drift context
ds: DriftSummary = result.drift_summary
print(ds.drift_detected) # True
print(ds.dataset_drift_score) # 0.72
print(ds.n_drifted) # 1
print(ds.feature_drift) # {"Temperature": 0.008, "Ghi": 0.41}
print(ds.report_path) # "/mlops_demo/reports/data_drift_20240601T120000Z.html"
Scheduling with Prefect deployments#
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from twiga.pipeline import retraining_flow
deployment = Deployment.build_from_flow(
flow=retraining_flow,
name="weekly-retrain",
schedule=CronSchedule(cron="0 2 * * 1"), # Every Monday at 02:00
parameters={
"reference_data_path": "data/train.parquet",
"current_data_path": "data/current.parquet",
"experiment": "load-forecast-retrain",
"fastapi_reload_url": "http://api:8000",
},
)
deployment.apply()
prefect agent start --work-queue default
retraining_flow decision logic#
load reference + current data
↓
run_drift_check → DriftSummary
↓
drift_fraction > threshold? ──Yes──┐
│ No │
evaluate incumbent │
↓ │
perf_threshold breached? ──Yes──→ fit new model on current data
│ No ↓
Return retrained=False evaluate new model
↓
new_metric < incumbent? ──Yes──→ save_checkpoint
│ No ↓
Return promoted=False POST /reload → Return promoted=True
10. Configuration: TwigaSettings#
TwigaSettings is a pydantic-settings model that reads TWIGA_* environment variables
(or a .env file). All MLOps components use it to pick up runtime config without
hardcoded values - 12-factor app style.
from twiga.core.settings import TwigaSettings
settings = TwigaSettings() # reads env or .env file
print(settings.mlflow_tracking_uri) # http://localhost:5000
print(settings.model_checkpoint_dir) # checkpoints
Environment variable |
Default |
Used by |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Example: lock the API behind a secret key#
export TWIGA_SERVE_API_KEY="my-secret-token"
# All /predict and /monitor requests now require:
# Header: X-API-Key: my-secret-token
headers = {"X-API-Key": "my-secret-token"}
resp = requests.post("http://localhost:8000/predict", json=payload, headers=headers)
Example: remote MLflow tracking#
export TWIGA_MLFLOW_TRACKING_URI="http://mlflow.internal:5000"
export TWIGA_MLFLOW_EXPERIMENT="production-load-forecast"
All TwigaTracker calls automatically use the remote server without any code changes.
from twiga.core.settings import TwigaSettings
settings = TwigaSettings()
settings_df = pd.DataFrame(
[
{"Variable": "TWIGA_MLFLOW_TRACKING_URI", "Current value": str(settings.mlflow_tracking_uri)},
{"Variable": "TWIGA_MLFLOW_EXPERIMENT", "Current value": str(settings.mlflow_experiment)},
{
"Variable": "TWIGA_SERVE_API_KEY",
"Current value": "(empty — open access)" if not settings.serve_api_key else "***",
},
{"Variable": "TWIGA_MODEL_CHECKPOINT_DIR", "Current value": str(settings.model_checkpoint_dir)},
{"Variable": "TWIGA_SERVE_REPORT_DIR", "Current value": str(settings.serve_report_dir)},
]
)
twiga_gt(
GT(settings_df)
.tab_header(
title=md("**TwigaSettings — active configuration**"),
subtitle="Override any value via environment variable or .env file",
)
.cols_label(Variable=md("**Environment variable**"), **{"Current value": md("**Current value**")})
.tab_source_note("All variables use the TWIGA_ prefix and are read via pydantic-settings"),
n_rows=len(settings_df),
)
11. API summary#
api_df = pd.DataFrame(
{
"Module": [
"twiga.tracking",
"twiga.tracking",
"twiga.forecaster",
"twiga.forecaster",
"twiga.serve",
"twiga.serve",
"twiga.serve",
"twiga.serve",
"twiga.pipeline",
"twiga.pipeline",
"twiga.pipeline",
"twiga.core",
],
"Class / Function": [
"TwigaTracker",
"TwigaTracker.log_*",
"on_save_checkpoint()",
"on_load_checkpoint()",
"create_app()",
"ModelLoader",
"ForecastMonitor",
"ForecastMonitor.run_data_drift",
"training_flow",
"retraining_flow",
"task.fn(...)",
"TwigaSettings",
],
"What it does": [
"Context manager around an MLflow run",
"Log dataset lineage, configs, model artefact, evaluation metrics",
"Persist fitted model + write versioned _checkpoint_manifest.json",
"Read manifest → load model; fallback to newest .pkl if no manifest",
"FastAPI factory — wires ModelLoader, ForecastMonitor, CPU offloading",
"Thread-safe lazy checkpoint loader (double-checked locking)",
"Evidently wrapper for drift and performance monitoring",
"Detect distribution shift in input features; write HTML report",
"Full Prefect flow: load → fit → evaluate → checkpoint → MLflow",
"Drift-aware Prefect flow: check → retrain → promote if better",
"Run a single Prefect task outside a flow (notebooks / tests)",
"12-factor env-var config for all MLOps components",
],
}
)
twiga_gt(
GT(api_df)
.tab_header(
title=md("**Tutorial 17 — MLOps API Quick Reference**"),
subtitle="All classes and functions introduced in this notebook",
)
.cols_label(
Module=md("**Module**"),
**{"Class / Function": md("**Class / Function**"), "What it does": md("**What it does**")},
)
.tab_source_note("Full API docs → twiga-forecast.readthedocs.io/mlops"),
n_rows=len(api_df),
)
Wrapping up#
What you did
Tracked an experiment end-to-end with
TwigaTracker(dataset, params, model, metrics)Saved a versioned checkpoint and round-tripped it with
on_save_checkpoint/on_load_checkpointDetected a simulated temperature distribution shift with
ForecastMonitor.run_data_driftServed forecasts over HTTP with
create_app()and called all six endpointsRan individual pipeline tasks via
.fn()without a Prefect serverRead typed
TrainingResult/RetrainingResult/DriftSummaryreturn valuesReviewed
TwigaSettingsenv-var configuration for 12-factor deployments
Key takeaways
TwigaTrackeris a five-step workflow: dataset → metadata → model → evaluation →run_id.Every checkpoint is versioned via
_checkpoint_manifest.json- rollbacks are auditable.create_app()handles thread safety and CPU offloading automatically - don’t callfc.predict()directly from async endpoints.Pipeline flows return dataclasses - use
result.checkpoint_path, notresult["checkpoint_path"].All runtime config lives in
TwigaSettings- never hardcode URIs or secrets in code.
What’s next?
Tutorial 10: Hyperparameter tuning with Optuna (
tune=Trueintraining_flow)Tutorial 11: Conformal prediction for calibrated interval forecasts served via
/predict-intervalTutorial 12: Neural network forecasters (NHITS, MLP) as drop-in replacements in the same MLOps pipeline