MLOps: Tracking, Serving & Monitoring#

Intermediate Python Twiga Time


What you’ll build

A complete MLOps pipeline around the LightGBM net-load forecaster from Tutorial 01: from raw experiment tracking all the way through drift-triggered retraining and hot-model-reload in a FastAPI server.

Prerequisites

  • Tutorial 01 (Getting Started) - you should know how to configure and fit a TwigaForecaster

  • Basic HTTP / REST concepts (GET, POST, JSON)

Learning objectives

By the end of this notebook you will be able to:

  1. Track experiments with TwigaTracker (dataset lineage, params, metrics, artefacts)

  2. Save and load versioned checkpoints via on_save_checkpoint / on_load_checkpoint

  3. Detect data drift and monitor forecast performance with ForecastMonitor

  4. Serve forecasts over HTTP with create_app() (FastAPI) and call all endpoints

  5. Run Prefect tasks directly via .fn() for debugging and testing

  6. Understand training_flow / retraining_flow and read their typed return values

  7. Configure the MLOps layer via TwigaSettings environment variables

The MLOps lifecycle

Fit model  →  Track (MLflow)  →  Checkpoint  →  Serve (FastAPI)
                                                      ↓
                               Retraining  ←  Monitor (Evidently)

All heavy dependencies are part of the [mlops] extra:

pip install twiga[mlops]

1. Setup#

from pathlib import Path

from great_tables import GT, md
from IPython.display import clear_output
from lets_plot import LetsPlot
import pandas as pd
from sklearn.preprocessing import RobustScaler, StandardScaler

LetsPlot.setup_html()
# warnings.filterwarnings("ignore")

from twiga import TwigaForecaster
from twiga.core.config import DataPipelineConfig, ForecasterConfig
from twiga.core.plot import plot_timeseries
from twiga.core.plot.gt import twiga_gt, twiga_report
from twiga.core.utils import configure, get_logger
from twiga.models.baseline.seasonal_naive_model import SEASONALNAIVEConfig
from twiga.models.ml import LIGHTGBMConfig

configure()
log = get_logger("tutorial")

# Directories used throughout this notebook
CHECKPOINT_DIR = Path("mlops_demo/checkpoints")
REPORT_DIR = Path("mlops_demo/reports")
DATA_DIR = Path("mlops_demo/data")

for d in [CHECKPOINT_DIR, REPORT_DIR, DATA_DIR]:
    d.mkdir(parents=True, exist_ok=True)

log.info("Setup complete.  Directories: %s", [str(d) for d in [CHECKPOINT_DIR, REPORT_DIR, DATA_DIR]])
2026-05-03 16:46:17 | INFO     | twiga.tutorial | Setup complete.  Directories: ['mlops_demo/checkpoints', 'mlops_demo/reports', 'mlops_demo/data']

2. Load data#

We use the same MLVS-PT dataset as Tutorial 01 - Madeira, Portugal at 30-minute resolution. The splits mirror Tutorial 01 so you can compare results directly.

Split

Period

Purpose

Train

2019-02-01 → 2020-12-31

Model learning

Validation

2020-01-01 → 2020-06-30

Early-stopping guard

Test

2020-07-01 → 2020-12-31

Final honest evaluation

Why the same dataset? MLOps tooling is independent of the model - the same TwigaTracker, ForecastMonitor, and pipeline flows work with any TwigaForecaster. Using a familiar dataset lets you focus on the MLOps concepts without new data concerns.

data = pd.read_parquet("../data/MLVS-PT.parquet")
data = data[["timestamp", "NetLoad(kW)", "Ghi", "Temperature"]]
data["timestamp"] = pd.to_datetime(data["timestamp"])
data = data.drop_duplicates(subset="timestamp").reset_index(drop=True)
# Restrict to 2019-2020 to keep tutorial execution fast
data = data[(data["timestamp"] >= "2019-01-01") & (data["timestamp"] <= "2020-12-31")].reset_index(drop=True)

train_df = data[data["timestamp"] < "2020-01-01"].reset_index(drop=True)
val_df = data[(data["timestamp"] >= "2020-01-01") & (data["timestamp"] < "2020-07-01")].reset_index(drop=True)
test_df = data[data["timestamp"] >= "2020-07-01"].reset_index(drop=True)

log.info(
    "train : %d rows  (%s%s)", len(train_df), train_df["timestamp"].min().date(), train_df["timestamp"].max().date()
)
log.info("val   : %d rows  (%s%s)", len(val_df), val_df["timestamp"].min().date(), val_df["timestamp"].max().date())
log.info(
    "test  : %d rows  (%s%s)", len(test_df), test_df["timestamp"].min().date(), test_df["timestamp"].max().date()
)

# Persist splits so pipeline tasks can read from disk
train_df.to_parquet(DATA_DIR / "train.parquet", index=False)
test_df.to_parquet(DATA_DIR / "test.parquet", index=False)
log.info("Splits saved to %s", DATA_DIR)
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
Cell In[2], line 1
----> 1 data = pd.read_parquet("../data/MLVS-PT.parquet")
      2 data = data[["timestamp", "NetLoad(kW)", "Ghi", "Temperature"]]
      3 data["timestamp"] = pd.to_datetime(data["timestamp"])
      4 data = data.drop_duplicates(subset="timestamp").reset_index(drop=True)

File ~/work/twiga-forecast/twiga-forecast/.venv/lib/python3.12/site-packages/pandas/io/parquet.py:669, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
    666     use_nullable_dtypes = False
    667 check_dtype_backend(dtype_backend)
--> 669 return impl.read(
    670     path,
    671     columns=columns,
    672     filters=filters,
    673     storage_options=storage_options,
    674     use_nullable_dtypes=use_nullable_dtypes,
    675     dtype_backend=dtype_backend,
    676     filesystem=filesystem,
    677     **kwargs,
    678 )

File ~/work/twiga-forecast/twiga-forecast/.venv/lib/python3.12/site-packages/pandas/io/parquet.py:258, in PyArrowImpl.read(self, path, columns, filters, use_nullable_dtypes, dtype_backend, storage_options, filesystem, **kwargs)
    256 if manager == "array":
    257     to_pandas_kwargs["split_blocks"] = True
--> 258 path_or_handle, handles, filesystem = _get_path_or_handle(
    259     path,
    260     filesystem,
    261     storage_options=storage_options,
    262     mode="rb",
    263 )
    264 try:
    265     pa_table = self.api.parquet.read_table(
    266         path_or_handle,
    267         columns=columns,
   (...)    270         **kwargs,
    271     )

File ~/work/twiga-forecast/twiga-forecast/.venv/lib/python3.12/site-packages/pandas/io/parquet.py:141, in _get_path_or_handle(path, fs, storage_options, mode, is_dir)
    131 handles = None
    132 if (
    133     not fs
    134     and not is_dir
   (...)    139     # fsspec resources can also point to directories
    140     # this branch is used for example when reading from non-fsspec URLs
--> 141     handles = get_handle(
    142         path_or_handle, mode, is_text=False, storage_options=storage_options
    143     )
    144     fs = None
    145     path_or_handle = handles.handle

File ~/work/twiga-forecast/twiga-forecast/.venv/lib/python3.12/site-packages/pandas/io/common.py:882, in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options)
    873         handle = open(
    874             handle,
    875             ioargs.mode,
   (...)    878             newline="",
    879         )
    880     else:
    881         # Binary mode
--> 882         handle = open(handle, ioargs.mode)
    883     handles.append(handle)
    885 # Convert BytesIO or file objects passed with an encoding

FileNotFoundError: [Errno 2] No such file or directory: '../data/MLVS-PT.parquet'

3. Configure and fit the forecaster#

The configuration mirrors Tutorial 01. ForecasterConfig.checkpoints_path tells the forecaster where to write the model artefact - this is the directory that ForecastMonitor, ModelLoader, and retraining_flow all read from.

data_config = DataPipelineConfig(
    target_feature="NetLoad(kW)",
    period="30min",
    latitude=32.371666,
    longitude=-16.274998,
    calendar_features=["hour", "day_night"],
    exogenous_features=["Ghi", "Temperature"],
    forecast_horizon=48,  # 24 h at 30-min resolution
    lookback_window_size=48 * 7,  # 48 h of history
    input_scaler=StandardScaler(),
    target_scaler=RobustScaler(),
    stride=48,  # non-overlapping windows (48 steps = 24 h at 30-min resolution
)

forecaster_config = ForecasterConfig(
    split_freq="months",
    train_size=3,
    test_size=1,
    checkpoints_path=str(CHECKPOINT_DIR),  # where on_save_checkpoint writes
)

model_config = LIGHTGBMConfig()
seasonal_config = SEASONALNAIVEConfig(period="7D", freq="30min")
forecaster = TwigaForecaster(
    data_params=data_config,
    model_params=[seasonal_config],
    train_params=forecaster_config,
)

log.info("Forecaster assembled — checkpoint dir: %s", forecaster.checkpoints_path)

4. Experiment tracking with TwigaTracker#

TwigaTracker is a context manager around an MLflow run. Inside the with block, five methods cover the full tracking workflow:

Step

Method

What it logs

1

log_dataset

Dataset name, shape, and source URI

2

log_forecaster_metadata

All Pydantic config fields as flat MLflow params

3

log_model

Checkpoint directory as an MLflow artefact (with conda env)

4

log_evaluation

Per-fold metrics and predictions DataFrame

run_id

property

The MLflow run ID for downstream steps

MLflow UI After running the cell, start mlflow ui --port 5000 in a terminal and open http://localhost:5000 to inspect the logged artefacts.

from twiga.tracking import TwigaTracker

with TwigaTracker(
    experiment="tutorial-seasonal",
    run_name="seasonal-naive",
    tags={"dataset": "MLVS-PT", "horizon": "48-step", "tutorial": "17"},
) as tracker:
    # Step 1 — dataset lineage
    tracker.log_dataset(
        train_df,
        name="MLVS-PT-train",
        source="../data/MLVS-PT.parquet",
    )
    forecaster.fit(train_df=train_df, val_df=val_df)
    clear_output()
    log.info("Training complete.")
    # Step 2 — hyperparameters and configs
    tracker.log_forecaster_metadata(forecaster)

    # Step 3 — model artefact
    tracker.log_model(forecaster, sample_input=train_df.head(500))

    # Step 4 — evaluation
    predictions_df, metrics_df = forecaster.evaluate_point_forecast(test_df=test_df, ensemble_strategy="mean")
    tracker.log_evaluation(metrics_df, results_df=predictions_df)

    mlflow_run_id = tracker.run_id

log.info("MLflow run: %s", mlflow_run_id)
#!mlflow ui --port 5000

Inspect the metrics#

res = (
    metrics_df.groupby("Model")[["mae", "corr", "nbias", "rmse", "wmape", "smape"]]
    .mean()
    .round(2)
    .reset_index()
    .rename(
        columns={"mae": "MAE", "corr": "Corr", "wmape": "WMAPE", "smape": "SMAPE", "nbias": "NBIAS", "rmse": "RMSE"}
    )
)

twiga_report(
    res,
    metrics=["MAE", "Corr", "SMAPE", "RMSE"],
    minimize_cols=["MAE", "SMAPE", "RMSE"],
    maximize_cols=["Corr"],
)

5. Versioned checkpoints#

on_save_checkpoint() persists the fitted model to disk and writes (or updates) _checkpoint_manifest.json - a small JSON file that records the version number, model type, and UTC timestamp of every save.

{
  "version": 1,
  "checkpoint_file": "model_and_pipeline.pkl",
  "model_type": "lightgbm",
  "saved_at": "2024-06-01T12:00:00+00:00"
}

on_load_checkpoint() reads the manifest first; if absent it falls back to the most-recently-modified *.pkl file for backward compatibility. Using the manifest makes rollbacks auditable and removes the fragility of mtime-sorting.

import json

# Persist the trained model
checkpoint_path = forecaster.on_save_checkpoint()
log.info("Checkpoint saved: %s", checkpoint_path)

# Inspect the manifest
manifest_path = Path(checkpoint_path) / "_checkpoint_manifest.json"
if manifest_path.exists():
    manifest = json.loads(manifest_path.read_text())
    log.info(
        "Manifest: version=%d  model_type=%s  saved_at=%s",
        manifest["version"],
        manifest["model_type"],
        manifest["saved_at"],
    )
else:
    log.warning("Manifest not found at %s", manifest_path)
# Create a fresh (unfitted) forecaster and load the checkpoint
loaded_forecaster = TwigaForecaster(
    data_params=data_config,
    model_params=[model_config],
    train_params=forecaster_config,
)

loaded_forecaster.on_load_checkpoint()
log.info("Checkpoint loaded — model ready: %s", loaded_forecaster.model is not None)

6. Production monitoring with ForecastMonitor#

ForecastMonitor wraps Evidently to run two types of production checks:

Method

What it checks

Trigger

run_data_drift

Distribution shift in input features

Weekly / on new data

run_performance

MAE / RMSE degradation vs reference

After each prediction batch

Both methods write an HTML report to report_dir and return a summary dict that retraining_flow uses to decide whether to retrain.

Workflow#

monitor.set_reference(train_df)          # once, at deployment time
            ↓
monitor.run_data_drift(current_df)       # drift_detected? → retrain
monitor.run_performance(actuals_df)      # mae / rmse degraded? → retrain
from twiga.serve import ForecastMonitor

monitor = ForecastMonitor(
    report_dir=str(REPORT_DIR),
    target_col="NetLoad(kW)",
    prediction_col="pred_NetLoad",
)

monitor.set_reference(train_df)
log.info("Reference set — %d rows from training split.", len(train_df))
import numpy as np

rng = np.random.default_rng(42)

# Simulate a production window with a distribution shift in Temperature
# (e.g. a heat wave that pushes temperatures +4 °C above the historical baseline)
current_df = test_df.copy()
current_df["Temperature"] = current_df["Temperature"] + rng.normal(4, 1.5, len(current_df))

_, drift_summary = monitor.run_data_drift(
    current_df,
    feature_cols=["Temperature", "Ghi"],
)

log.info("Drift detected   : %s", drift_summary["drift_detected"])
log.info("Dataset score    : %.3f", drift_summary["dataset_drift_score"])
log.info("Drifted features : %d / %d", drift_summary["n_drifted"], drift_summary["total"])
for feat, score in drift_summary.get("feature_drift", {}).items():
    log.info("  %-15s  p-value = %.4f", feat, score)
# Performance monitoring — add a prediction column to a small batch
# We use the loaded_forecaster from section 5 to show round-trip checkpoint → predict
batch = test_df.head(1000).copy()
preds_arr, _ = loaded_forecaster.predict(batch)

# preds_arr values are 3-D arrays: (n_windows, horizon, n_targets).
# Each window i predicts `horizon` steps starting at row `lookback + i * stride`
# in the original batch.  We take the first horizon step of every window and
# align each prediction to its matching batch row.
first_model_key = list(preds_arr.keys())[0]
pred = preds_arr[first_model_key]  # (n_windows, horizon, 1)

lookback = data_config.lookback_window_size
stride = data_config.stride
n_windows = pred.shape[0]

pred_row_indices = [lookback + i * stride for i in range(n_windows)]
monitor_batch = batch.iloc[pred_row_indices].copy()
monitor_batch["pred_NetLoad"] = pred[:, 0, 0]

perf_summary = monitor.run_performance(monitor_batch)
log.info("MAE  : %.4f kW", perf_summary.get("mae", float("nan")))
log.info("RMSE : %.4f kW", perf_summary.get("rmse", float("nan")))
log.info("Report → %s", perf_summary.get("report_path", "n/a"))

7. Model serving with FastAPI#

create_app() is a factory that returns a configured FastAPI application. It wires together:

  • ModelLoader: thread-safe lazy checkpoint loader with double-checked locking

  • ForecastMonitor: Evidently drift and performance checks exposed as HTTP endpoints

  • CPU offloading: fc.predict() runs in a thread pool via anyio.to_thread.run_sync so the async event loop stays responsive under load

Endpoints#

Method

Path

Description

GET

/health

Model names, horizon, targets, load status

POST

/predict

Point forecast from raw records

POST

/predict-interval

Interval forecast (requires probabilistic model)

POST

/monitor/drift

Run data drift report on the request payload

GET

/monitor/performance

Latest performance summary

POST

/reload

Hot-reload model from latest checkpoint

The cells below use httpx.AsyncClient with the app mounted in-process (no actual server needed).

from twiga.serve import create_app

app = create_app(
    forecaster=forecaster,
    report_dir=str(REPORT_DIR),
    title="Net Load Forecast API — Tutorial 17",
)

log.info("FastAPI app created — routes: %s", [r.path for r in app.routes])
from fastapi.testclient import TestClient

client = TestClient(app)

# Health check — confirms the model is loaded and lists capabilities
resp = client.get("/health")
health = resp.json()

twiga_gt(
    GT(
        pd.DataFrame(
            [
                {"Field": "status", "Value": health.get("status")},
                {"Field": "models", "Value": str(health.get("models"))},
                {"Field": "forecast_horizon", "Value": str(health.get("forecast_horizon"))},
                {"Field": "targets", "Value": str(health.get("targets"))},
            ]
        )
    )
    .tab_header(title=md("**GET /health**"), subtitle="API capability report")
    .cols_label(Field=md("**Field**"), Value=md("**Value**")),
    n_rows=4,
)
# Point forecast — POST /predict
# Convert timestamps to ISO strings so JSON serialisation works
batch_records = (
    test_df.head(1000)
    .assign(timestamp=test_df.head(1000)["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S"))
    .to_dict("records")
)

resp = client.post("/predict", json={"records": batch_records, "ensemble_strategy": "mean"})
resp.raise_for_status()
forecast_resp = resp.json()

log.info("Response keys : %s", list(forecast_resp.keys()))
log.info("Models returned: %d", len(forecast_resp["forecasts"]))
for f in forecast_resp["forecasts"]:
    log.info("  %-20s  predictions shape: %s", f["model"], str(len(f["predictions"])))
# Data drift via API — POST /monitor/drift
drift_records = (
    current_df.head(500)
    .assign(timestamp=current_df.head(500)["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S"))
    .to_dict("records")
)

resp = client.post("/monitor/drift", json={"records": drift_records})
drift_api = resp.json()

log.info("drift_detected     : %s", drift_api.get("drift_detected"))
log.info("dataset_drift_score: %.3f", drift_api.get("dataset_drift_score", 0))
log.info("n_drifted          : %d", drift_api.get("n_drifted", 0))
# Hot reload — POST /reload
# In production, retraining_flow calls this after promoting a new model.
# Here we trigger it manually to verify the endpoint works.
resp = client.post("/reload")
log.info("Reload status: %d  body: %s", resp.status_code, resp.json())

Running in production#

To serve real traffic, write the app to a module and start uvicorn:

# serve.py
from twiga import TwigaForecaster
from twiga.core.config import DataPipelineConfig, ForecasterConfig
from twiga.serve import create_app
from sklearn.preprocessing import RobustScaler, StandardScaler

data_config = DataPipelineConfig(
    target_feature="NetLoad(kW)",
    period="30min",
    latitude=32.371666,
    longitude=-16.274998,
    calendar_features=["hour", "day_night"],
    exogenous_features=["Ghi", "Temperature"],
    forecast_horizon=48,
    lookback_window_size=96,
    input_scaler=StandardScaler(),
    target_scaler=RobustScaler(),
)
train_params = ForecasterConfig(
    split_freq="months", train_size=3, test_size=1,
    checkpoints_path="mlops_demo/checkpoints",
)
from twiga.models.ml import LIGHTGBMConfig

forecaster = TwigaForecaster(
    data_params=data_config, model_params=[LIGHTGBMConfig()], train_params=train_params
)
app = create_app(forecaster, report_dir="mlops_demo/reports", title="Net Load Forecast API")
uvicorn serve:app --host 0.0.0.0 --port 8000 --workers 2

Secure the API by setting TWIGA_SERVE_API_KEY - see Section 10.

8. Pipeline tasks: direct invocation#

Every Prefect task in twiga.pipeline.tasks exposes a .fn() method that runs the underlying Python function without a running Prefect server. This is ideal for:

  • Debugging a single step without triggering the full flow

  • Running tasks inside notebooks or unit tests

  • Composing ad-hoc pipelines outside of Prefect

Convention: task.fn(...) bypasses Prefect’s retry logic, logging, and state tracking - it is the raw Python function.

import logging
from unittest.mock import patch

plain_logger = logging.getLogger("tutorial")

# Patch get_run_logger so tasks work without a running Prefect server
with patch("twiga.pipeline.tasks.get_run_logger", return_value=plain_logger):
    from twiga.pipeline.tasks import (
        evaluate_model,
        fit_forecaster,
        load_data,
        run_drift_check,
        save_checkpoint,
    )

    # 1. load_data — read parquet, parse timestamps, split train/test
    tr, te = load_data.fn(
        data_path=DATA_DIR / "train.parquet",
        date_col="timestamp",
    )
    log.info("load_data  → train %d rows | test %d rows", len(tr), len(te))

    # 2. fit_forecaster
    fresh_forecaster = TwigaForecaster(
        data_params=data_config,
        model_params=[model_config],
        train_params=forecaster_config,
    )
    fitted = fit_forecaster.fn(forecaster=fresh_forecaster, train_df=tr, tune=False)
    clear_output()
    log.info("fit_forecaster → model fitted: %s", fitted.model is not None)

    # 3. evaluate_model
    preds_df, met_df = evaluate_model.fn(forecaster=fitted, test_df=te)
    log.info("evaluate_model → metrics: %s", met_df[["mae", "rmse"]].mean().to_dict())

    # 4. save_checkpoint
    ckpt = save_checkpoint.fn(forecaster=fitted)
    log.info("save_checkpoint → %s", ckpt)

    # 5. run_drift_check (reference = train, current = shifted test)
    drift_raw = run_drift_check.fn(
        reference_df=tr,
        current_df=current_df.head(len(tr)),
        report_dir=str(REPORT_DIR),
    )
    log.info("run_drift_check → detected=%s  score=%.3f", drift_raw["drift_detected"], drift_raw["dataset_drift_score"])

9. Full Prefect flows#

In production you schedule training_flow and retraining_flow as Prefect deployments. Both flows return typed dataclasses - use attribute access, not dict keys.

training_flow: full train → evaluate → checkpoint → MLflow#

from twiga.pipeline import training_flow
from twiga.pipeline.schemas import TrainingResult

result: TrainingResult = training_flow(
    forecaster=forecaster,
    data_path="mlops_demo/data/train.parquet",
    date_col="timestamp",
    train_end="2020-12-31",
    test_start="2020-01-01",
    tune=False,
    experiment="tutorial-17",
    run_name="lgbm-flow-v1",
)

# Attribute access - not result["checkpoint_path"]
print(result.checkpoint_path)   # "/path/to/checkpoints/"
print(result.mlflow_run_id)     # "a1b2c3d4e5f6"
print(result.metrics)           # {"lightgbm": {"mae": 4.23, "rmse": 6.11, "corr": 0.97}}

retraining_flow: drift-triggered retrain with promotion logic#

from twiga.pipeline import retraining_flow
from twiga.pipeline.schemas import RetrainingResult, DriftSummary

result: RetrainingResult = retraining_flow(
    forecaster=forecaster,
    reference_data_path="mlops_demo/data/train.parquet",
    current_data_path="mlops_demo/data/test.parquet",
    drift_threshold=0.5,          # retrain if > 50% of features drifted
    performance_threshold=None,   # drift check only in this example
    metric_key="mae",
    minimize=True,
    experiment="tutorial-17-retrain",
    run_name="lgbm-retrain-v1",
    fastapi_reload_url="http://localhost:8000",  # POST /reload after promotion
)

# Typed return - no dict-key guesswork
print(result.retrained)           # True
print(result.promoted)            # True
print(result.checkpoint_path)     # "/path/to/checkpoints/"
print(result.mlflow_run_id)       # "f7e8d9c0b1a2"
print(result.metrics)             # {"lightgbm": {"mae": 3.91, ...}}

# DriftSummary - rich drift context
ds: DriftSummary = result.drift_summary
print(ds.drift_detected)          # True
print(ds.dataset_drift_score)     # 0.72
print(ds.n_drifted)               # 1
print(ds.feature_drift)           # {"Temperature": 0.008, "Ghi": 0.41}
print(ds.report_path)             # "/mlops_demo/reports/data_drift_20240601T120000Z.html"

Scheduling with Prefect deployments#

from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from twiga.pipeline import retraining_flow

deployment = Deployment.build_from_flow(
    flow=retraining_flow,
    name="weekly-retrain",
    schedule=CronSchedule(cron="0 2 * * 1"),  # Every Monday at 02:00
    parameters={
        "reference_data_path": "data/train.parquet",
        "current_data_path":   "data/current.parquet",
        "experiment":          "load-forecast-retrain",
        "fastapi_reload_url":  "http://api:8000",
    },
)
deployment.apply()
prefect agent start --work-queue default

retraining_flow decision logic#

load reference + current data
         ↓
run_drift_check → DriftSummary
         ↓
drift_fraction > threshold? ──Yes──┐
         │ No                      │
evaluate incumbent                 │
         ↓                         │
perf_threshold breached? ──Yes──→  fit new model on current data
         │ No                              ↓
Return retrained=False          evaluate new model
                                         ↓
                         new_metric < incumbent? ──Yes──→ save_checkpoint
                                         │ No                    ↓
                              Return promoted=False    POST /reload → Return promoted=True

10. Configuration: TwigaSettings#

TwigaSettings is a pydantic-settings model that reads TWIGA_* environment variables (or a .env file). All MLOps components use it to pick up runtime config without hardcoded values - 12-factor app style.

from twiga.core.settings import TwigaSettings

settings = TwigaSettings()   # reads env or .env file
print(settings.mlflow_tracking_uri)    # http://localhost:5000
print(settings.model_checkpoint_dir)  # checkpoints

Environment variable

Default

Used by

TWIGA_MLFLOW_TRACKING_URI

http://localhost:5000

TwigaTracker

TWIGA_MLFLOW_EXPERIMENT

twiga

TwigaTracker

TWIGA_SERVE_API_KEY

"" (open)

create_app - sets X-API-Key auth

TWIGA_MODEL_CHECKPOINT_DIR

checkpoints

ForecasterConfig default

TWIGA_SERVE_REPORT_DIR

reports

ForecastMonitor default

Example: lock the API behind a secret key#

export TWIGA_SERVE_API_KEY="my-secret-token"
# All /predict and /monitor requests now require:
# Header: X-API-Key: my-secret-token
headers = {"X-API-Key": "my-secret-token"}
resp = requests.post("http://localhost:8000/predict", json=payload, headers=headers)

Example: remote MLflow tracking#

export TWIGA_MLFLOW_TRACKING_URI="http://mlflow.internal:5000"
export TWIGA_MLFLOW_EXPERIMENT="production-load-forecast"

All TwigaTracker calls automatically use the remote server without any code changes.

from twiga.core.settings import TwigaSettings

settings = TwigaSettings()

settings_df = pd.DataFrame(
    [
        {"Variable": "TWIGA_MLFLOW_TRACKING_URI", "Current value": str(settings.mlflow_tracking_uri)},
        {"Variable": "TWIGA_MLFLOW_EXPERIMENT", "Current value": str(settings.mlflow_experiment)},
        {
            "Variable": "TWIGA_SERVE_API_KEY",
            "Current value": "(empty — open access)" if not settings.serve_api_key else "***",
        },
        {"Variable": "TWIGA_MODEL_CHECKPOINT_DIR", "Current value": str(settings.model_checkpoint_dir)},
        {"Variable": "TWIGA_SERVE_REPORT_DIR", "Current value": str(settings.serve_report_dir)},
    ]
)

twiga_gt(
    GT(settings_df)
    .tab_header(
        title=md("**TwigaSettings — active configuration**"),
        subtitle="Override any value via environment variable or .env file",
    )
    .cols_label(Variable=md("**Environment variable**"), **{"Current value": md("**Current value**")})
    .tab_source_note("All variables use the TWIGA_ prefix and are read via pydantic-settings"),
    n_rows=len(settings_df),
)

11. API summary#

api_df = pd.DataFrame(
    {
        "Module": [
            "twiga.tracking",
            "twiga.tracking",
            "twiga.forecaster",
            "twiga.forecaster",
            "twiga.serve",
            "twiga.serve",
            "twiga.serve",
            "twiga.serve",
            "twiga.pipeline",
            "twiga.pipeline",
            "twiga.pipeline",
            "twiga.core",
        ],
        "Class / Function": [
            "TwigaTracker",
            "TwigaTracker.log_*",
            "on_save_checkpoint()",
            "on_load_checkpoint()",
            "create_app()",
            "ModelLoader",
            "ForecastMonitor",
            "ForecastMonitor.run_data_drift",
            "training_flow",
            "retraining_flow",
            "task.fn(...)",
            "TwigaSettings",
        ],
        "What it does": [
            "Context manager around an MLflow run",
            "Log dataset lineage, configs, model artefact, evaluation metrics",
            "Persist fitted model + write versioned _checkpoint_manifest.json",
            "Read manifest → load model; fallback to newest .pkl if no manifest",
            "FastAPI factory — wires ModelLoader, ForecastMonitor, CPU offloading",
            "Thread-safe lazy checkpoint loader (double-checked locking)",
            "Evidently wrapper for drift and performance monitoring",
            "Detect distribution shift in input features; write HTML report",
            "Full Prefect flow: load → fit → evaluate → checkpoint → MLflow",
            "Drift-aware Prefect flow: check → retrain → promote if better",
            "Run a single Prefect task outside a flow (notebooks / tests)",
            "12-factor env-var config for all MLOps components",
        ],
    }
)

twiga_gt(
    GT(api_df)
    .tab_header(
        title=md("**Tutorial 17 — MLOps API Quick Reference**"),
        subtitle="All classes and functions introduced in this notebook",
    )
    .cols_label(
        Module=md("**Module**"),
        **{"Class / Function": md("**Class / Function**"), "What it does": md("**What it does**")},
    )
    .tab_source_note("Full API docs → twiga-forecast.readthedocs.io/mlops"),
    n_rows=len(api_df),
)

Wrapping up#

What you did

  • Tracked an experiment end-to-end with TwigaTracker (dataset, params, model, metrics)

  • Saved a versioned checkpoint and round-tripped it with on_save_checkpoint / on_load_checkpoint

  • Detected a simulated temperature distribution shift with ForecastMonitor.run_data_drift

  • Served forecasts over HTTP with create_app() and called all six endpoints

  • Ran individual pipeline tasks via .fn() without a Prefect server

  • Read typed TrainingResult / RetrainingResult / DriftSummary return values

  • Reviewed TwigaSettings env-var configuration for 12-factor deployments

Key takeaways

  1. TwigaTracker is a five-step workflow: dataset → metadata → model → evaluation → run_id.

  2. Every checkpoint is versioned via _checkpoint_manifest.json - rollbacks are auditable.

  3. create_app() handles thread safety and CPU offloading automatically - don’t call fc.predict() directly from async endpoints.

  4. Pipeline flows return dataclasses - use result.checkpoint_path, not result["checkpoint_path"].

  5. All runtime config lives in TwigaSettings - never hardcode URIs or secrets in code.


What’s next?

  • Tutorial 10: Hyperparameter tuning with Optuna (tune=True in training_flow)

  • Tutorial 11: Conformal prediction for calibrated interval forecasts served via /predict-interval

  • Tutorial 12: Neural network forecasters (NHITS, MLP) as drop-in replacements in the same MLOps pipeline