Model Serving#

Source files
  • twiga/serve/app.py - create_app()

  • twiga/serve/loader.py - ModelLoader

  • twiga/serve/schemas.py - request / response schemas

Overview#

twiga.serve wraps a fitted TwigaForecaster in a production-ready FastAPI REST API. The application is created by the create_app() factory and served with uvicorn.

pip install twiga[mlops]

Quick start#

# serve.py
from twiga import TwigaForecaster
from twiga.core.config import DataPipelineConfig, ForecasterConfig
from twiga.serve import create_app

forecaster = TwigaForecaster(
    data_params=DataPipelineConfig(...),
    model_params=[...],
    train_params=ForecasterConfig(...),
)

app = create_app(forecaster, report_dir="reports/", title="Load Forecast API")
uvicorn serve:app --host 0.0.0.0 --port 8000

The forecaster checkpoint is loaded once at startup via ModelLoader. The interactive OpenAPI docs are available at http://localhost:8000/docs.


Endpoints#

Method

Path

Tag

Description

GET

/health

Ops

Readiness probe - returns model metadata

POST

/predict

Forecast

Point forecast

POST

/predict-interval

Forecast

Conformal prediction interval

POST

/monitor/drift

Monitor

Evidently data drift report

POST

/monitor/performance

Monitor

Evidently regression performance

POST

/reload

Ops

Hot-reload checkpoint from disk

GET /health#

Returns service status and loaded model metadata:

{
  "status": "ok",
  "models": ["lightgbm", "catboost"],
  "forecast_horizon": 24,
  "targets": ["Load(MW)"]
}

POST /predict#

Request

{
  "records": [
    {"timestamp": "2024-01-01T00:00:00Z", "temperature": 12.5, "hour": 0},
    {"timestamp": "2024-01-01T01:00:00Z", "temperature": 12.1, "hour": 1}
  ],
  "ensemble_strategy": "mean",
  "prepare_test_data": true
}

Response - ForecastResponse

{
  "predictions": {
    "lightgbm": [[120.5, 118.3, ...]],
    "catboost":  [[121.0, 119.1, ...]]
  },
  "inference_time_ms": {"lightgbm": 3.2, "catboost": 2.9},
  "targets": ["Load(MW)"],
  "forecast_horizon": 24
}

POST /predict-interval#

Same request shape as /predict plus an alpha field:

{
  "records": [...],
  "alpha": 0.1,
  "ensemble_strategy": "mean"
}

Response - IntervalResponse

{
  "lower": {"lightgbm": [[105.0, ...]]},
  "point": {"lightgbm": [[120.5, ...]]},
  "upper": {"lightgbm": [[136.0, ...]]},
  "alpha": 0.1,
  "coverage": 0.9,
  "targets": ["Load(MW)"],
  "forecast_horizon": 24
}

Requires conformal calibration (forecaster.calibrate(...)) before serving.

POST /reload#

Hot-reloads the checkpoint without restarting the process:

curl -X POST http://localhost:8000/reload
# → {"status": "reloaded", "models": "['lightgbm', 'catboost']"}

API reference#

create_app#

twiga.serve.app.create_app(forecaster, report_dir='reports', title='Twiga Forecast API', version='0.1.0', api_key=None)#

Create and configure the Twiga serving FastAPI application.

Parameters:
  • forecaster (TwigaForecaster) – Configured (fitted or loadable) forecaster instance.

  • report_dir (str) – Directory where monitoring reports are stored.

  • title (str) – Title shown in the OpenAPI docs.

  • version (str) – Semantic version string for the API.

  • api_key (str | None) – Secret key for X-API-Key auth. Pass None to run without auth (development only). Defaults to the value of the TWIGA_SERVE_API_KEY environment variable when not supplied.

Return type:

FastAPI

Returns:

A configured fastapi.FastAPI instance.

ModelLoader#

class twiga.serve.loader.ModelLoader(forecaster)#

Bases: object

Lazy, thread-safe checkpoint loader for a TwigaForecaster.

The forecaster is deserialised from disk exactly once on first access and cached for the process lifetime. A threading.Lock ensures that concurrent requests from multiple threads (e.g. a Uvicorn worker with a thread pool) cannot trigger a double-load race.

Parameters:

forecaster (TwigaForecaster) – A pre-instantiated (but not yet loaded) forecaster whose checkpoints_path points to a valid checkpoint directory.

Example:

loader = ModelLoader(forecaster)
forecaster = loader.load()  # loads once; subsequent calls are no-ops
loader.reload()  # force reload from disk
property forecast_horizon: int#

Forecast horizon from the fitted data pipeline.

Raises:

NotFittedError – If the forecaster has not been loaded yet.

property is_loaded: bool#

True if the forecaster has been successfully loaded.

load()#

Load the forecaster from its checkpoint directory.

Idempotent and thread-safe - subsequent calls return the cached instance without re-reading from disk.

Return type:

TwigaForecaster

Returns:

The loaded TwigaForecaster.

Raises:
property model_names: list[str]#

Names of all loaded models.

Raises:

NotFittedError – If the forecaster has not been loaded yet.

reload()#

Force a fresh load from disk, discarding the cached model.

Useful for hot-reloading after a new model version has been promoted to the checkpoint directory.

Return type:

TwigaForecaster

Returns:

The freshly loaded TwigaForecaster.

Raises:

NotFittedError – If no checkpoint files are found.

property targets: list[str]#

Target variable names from the fitted data pipeline.

Raises:

NotFittedError – If the forecaster has not been loaded yet.

Request / response schemas#

Pydantic request and response schemas for the Twiga serving API.

All schemas use strict typing and are self-documenting through Field descriptions, consistent with the rest of the Twiga config system.

class twiga.serve.schemas.ForecastRequest(**data)#

Bases: BaseModel

Payload for a point or interval forecast request.

Parameters:
  • records (list[dict[str, Any]]) – Time series records as a list of dicts (each dict is one row, must contain the timestamp column and all feature columns required by the fitted pipeline).

  • ensemble_strategy (Literal['mean', 'median', 'weighted_mean'] | None) – How to combine predictions from multiple models. None returns predictions per model.

  • prepare_test_data (bool) – Whether to prepend training tail rows before transforming (mirrors the forecaster flag).

Example:

{
    "records": [
        {"timestamp": "2024-01-01T00:00:00", "temperature": 12.5, ...},
        ...
    ],
    "ensemble_strategy": "mean"
}
ensemble_strategy: Literal['mean', 'median', 'weighted_mean'] | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

prepare_test_data: bool#
records: list[dict[str, Any]]#
classmethod records_not_empty(v)#

Validate records list is non-empty.

Return type:

list[dict[str, Any]]

class twiga.serve.schemas.ForecastResponse(**data)#

Bases: BaseModel

Response for a point forecast request.

Parameters:
  • forecasts (list[ModelPrediction]) – Per-model prediction payloads.

  • targets (list[str]) – Target variable names in the same order as the innermost dimension of each predictions array.

  • forecast_horizon (int) – Number of future time steps predicted.

forecast_horizon: int#
forecasts: list[ModelPrediction]#
classmethod from_arrays(predictions, inference_times, targets, forecast_horizon)#

Build a response from raw numpy prediction arrays.

Parameters:
  • predictions (dict[str, ndarray]) – Mapping of model name → array (B, H, T).

  • inference_times (dict[str, float]) – Mapping of model name → seconds.

  • targets (list[str]) – Target variable names.

  • forecast_horizon (int) – Number of forecast steps.

Return type:

ForecastResponse

Returns:

Populated ForecastResponse.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

targets: list[str]#
class twiga.serve.schemas.HealthRequest(**data)#

Bases: BaseModel

Empty request body for health-check endpoints.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class twiga.serve.schemas.HealthResponse(**data)#

Bases: BaseModel

Health-check response.

Parameters:
  • status (Literal['ok', 'degraded']) – "ok" when the service is ready.

  • models (list[str]) – Names of loaded models.

  • forecast_horizon (int) – Configured forecast horizon.

  • targets (list[str]) – Configured target variables.

forecast_horizon: int#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

models: list[str]#
status: Literal['ok', 'degraded']#
targets: list[str]#
class twiga.serve.schemas.IntervalPrediction(**data)#

Bases: BaseModel

Conformal interval prediction for a single model.

Parameters:
  • model (str) – Model identifier string.

  • lower (list[list[list[float]]]) – Lower bound array (n_batch, n_horizon, n_targets).

  • forecast (list[list[list[float]]]) – Point forecast array (n_batch, n_horizon, n_targets).

  • upper (list[list[list[float]]]) – Upper bound array (n_batch, n_horizon, n_targets).

  • coverage (float) – Nominal coverage level (1 - alpha).

  • inference_time (float) – Wall-clock seconds spent on inference.

coverage: float#
forecast: list[list[list[float]]]#
inference_time: float#
lower: list[list[list[float]]]#
model: str#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

upper: list[list[list[float]]]#
class twiga.serve.schemas.IntervalRequest(**data)#

Bases: ForecastRequest

Payload for a conformal prediction interval request.

Extends ForecastRequest with coverage level control.

Parameters:

alpha (float) – Miscoverage rate. alpha=0.1 targets 90 % coverage.

alpha: float#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class twiga.serve.schemas.IntervalResponse(**data)#

Bases: BaseModel

Response for a conformal interval forecast request.

Parameters:
  • forecasts (list[IntervalPrediction]) – Per-model interval prediction payloads.

  • targets (list[str]) – Target variable names.

  • forecast_horizon (int) – Number of future time steps predicted.

forecast_horizon: int#
forecasts: list[IntervalPrediction]#
classmethod from_arrays(predictions, inference_times, targets, forecast_horizon, alpha=0.1)#

Build a response from raw numpy interval arrays.

Parameters:
  • predictions (dict[str, tuple[ndarray, ndarray, ndarray]]) – Mapping of model name → (lower, forecast, upper).

  • inference_times (dict[str, float]) – Mapping of model name → seconds.

  • targets (list[str]) – Target variable names.

  • forecast_horizon (int) – Number of forecast steps.

  • alpha (float) – Miscoverage rate used during calibration.

Return type:

IntervalResponse

Returns:

Populated IntervalResponse.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

targets: list[str]#
class twiga.serve.schemas.ModelPrediction(**data)#

Bases: BaseModel

Point predictions for a single model.

Parameters:
  • model (str) – Model identifier string.

  • predictions (list[list[list[float]]]) – Nested list of shape (n_batch, n_horizon, n_targets).

  • inference_time (float) – Wall-clock seconds spent on inference.

inference_time: float#
model: str#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

predictions: list[list[list[float]]]#
class twiga.serve.schemas.MonitorResponse(**data)#

Bases: BaseModel

Response payload for a monitoring / drift report request.

Parameters:
  • drift_detected (bool) – Whether data drift was detected overall.

  • n_drifted_features (int) – Number of features that drifted.

  • feature_drift (dict[str, float]) – Per-feature drift scores.

  • report_path (str | None) – Path to the full HTML report on disk.

drift_detected: bool#
feature_drift: dict[str, float]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

n_drifted_features: int#
report_path: str | None#

ModelLoader#

ModelLoader handles lazy, cached checkpoint loading so that deserialisation happens once at startup rather than on every request.

from twiga.serve import ModelLoader

loader = ModelLoader(forecaster)
fc = loader.load()    # loads from checkpoint_path; cached on subsequent calls
loader.reload()       # force fresh load (e.g. after a new model version is saved)

Key properties exposed after loading:

Property

Type

Description

is_loaded

bool

True after a successful load

targets

list[str]

Target variable names

forecast_horizon

int

Steps ahead

model_names

list[str]

Names of all loaded models


Deployment tips#

Pre-fork model loading - For multi-worker deployments use gunicorn with --preload so the checkpoint is loaded once in the master process and forked into each worker, avoiding repeated disk I/O:

gunicorn serve:app -k uvicorn.workers.UvicornWorker -w 4 --preload

Health checks - Wire /health to your container orchestrator’s liveness and readiness probes. The endpoint returns status: "degraded" if the model has not been loaded successfully.

Hot reload - After saving a new checkpoint, trigger /reload via your deployment pipeline instead of restarting the container.

See Pipeline for how training_flow saves checkpoints and triggers /reload automatically via Prefect.