Model Serving#
Source files
twiga/serve/app.py-create_app()twiga/serve/loader.py-ModelLoadertwiga/serve/schemas.py- request / response schemas
Overview#
twiga.serve wraps a fitted TwigaForecaster in a production-ready
FastAPI REST API. The application is created
by the create_app() factory and served with uvicorn.
pip install twiga[mlops]
Quick start#
# serve.py
from twiga import TwigaForecaster
from twiga.core.config import DataPipelineConfig, ForecasterConfig
from twiga.serve import create_app
forecaster = TwigaForecaster(
data_params=DataPipelineConfig(...),
model_params=[...],
train_params=ForecasterConfig(...),
)
app = create_app(forecaster, report_dir="reports/", title="Load Forecast API")
uvicorn serve:app --host 0.0.0.0 --port 8000
The forecaster checkpoint is loaded once at startup via ModelLoader. The
interactive OpenAPI docs are available at http://localhost:8000/docs.
Endpoints#
Method |
Path |
Tag |
Description |
|---|---|---|---|
|
|
Ops |
Readiness probe - returns model metadata |
|
|
Forecast |
Point forecast |
|
|
Forecast |
Conformal prediction interval |
|
|
Monitor |
Evidently data drift report |
|
|
Monitor |
Evidently regression performance |
|
|
Ops |
Hot-reload checkpoint from disk |
GET /health#
Returns service status and loaded model metadata:
{
"status": "ok",
"models": ["lightgbm", "catboost"],
"forecast_horizon": 24,
"targets": ["Load(MW)"]
}
POST /predict#
Request
{
"records": [
{"timestamp": "2024-01-01T00:00:00Z", "temperature": 12.5, "hour": 0},
{"timestamp": "2024-01-01T01:00:00Z", "temperature": 12.1, "hour": 1}
],
"ensemble_strategy": "mean",
"prepare_test_data": true
}
Response - ForecastResponse
{
"predictions": {
"lightgbm": [[120.5, 118.3, ...]],
"catboost": [[121.0, 119.1, ...]]
},
"inference_time_ms": {"lightgbm": 3.2, "catboost": 2.9},
"targets": ["Load(MW)"],
"forecast_horizon": 24
}
POST /predict-interval#
Same request shape as /predict plus an alpha field:
{
"records": [...],
"alpha": 0.1,
"ensemble_strategy": "mean"
}
Response - IntervalResponse
{
"lower": {"lightgbm": [[105.0, ...]]},
"point": {"lightgbm": [[120.5, ...]]},
"upper": {"lightgbm": [[136.0, ...]]},
"alpha": 0.1,
"coverage": 0.9,
"targets": ["Load(MW)"],
"forecast_horizon": 24
}
Requires conformal calibration (forecaster.calibrate(...)) before serving.
POST /reload#
Hot-reloads the checkpoint without restarting the process:
curl -X POST http://localhost:8000/reload
# → {"status": "reloaded", "models": "['lightgbm', 'catboost']"}
API reference#
create_app#
- twiga.serve.app.create_app(forecaster, report_dir='reports', title='Twiga Forecast API', version='0.1.0', api_key=None)#
Create and configure the Twiga serving FastAPI application.
- Parameters:
forecaster (
TwigaForecaster) – Configured (fitted or loadable) forecaster instance.report_dir (
str) – Directory where monitoring reports are stored.title (
str) – Title shown in the OpenAPI docs.version (
str) – Semantic version string for the API.api_key (
str|None) – Secret key forX-API-Keyauth. PassNoneto run without auth (development only). Defaults to the value of theTWIGA_SERVE_API_KEYenvironment variable when not supplied.
- Return type:
FastAPI- Returns:
A configured
fastapi.FastAPIinstance.
ModelLoader#
- class twiga.serve.loader.ModelLoader(forecaster)#
Bases:
objectLazy, thread-safe checkpoint loader for a
TwigaForecaster.The forecaster is deserialised from disk exactly once on first access and cached for the process lifetime. A
threading.Lockensures that concurrent requests from multiple threads (e.g. a Uvicorn worker with a thread pool) cannot trigger a double-load race.- Parameters:
forecaster (
TwigaForecaster) – A pre-instantiated (but not yet loaded) forecaster whosecheckpoints_pathpoints to a valid checkpoint directory.
Example:
loader = ModelLoader(forecaster) forecaster = loader.load() # loads once; subsequent calls are no-ops loader.reload() # force reload from disk
- property forecast_horizon: int#
Forecast horizon from the fitted data pipeline.
- Raises:
NotFittedError – If the forecaster has not been loaded yet.
- load()#
Load the forecaster from its checkpoint directory.
Idempotent and thread-safe - subsequent calls return the cached instance without re-reading from disk.
- Return type:
- Returns:
The loaded
TwigaForecaster.- Raises:
NotFittedError – If no checkpoint files are found.
ValueError – If
checkpoints_pathis not configured.
- property model_names: list[str]#
Names of all loaded models.
- Raises:
NotFittedError – If the forecaster has not been loaded yet.
- reload()#
Force a fresh load from disk, discarding the cached model.
Useful for hot-reloading after a new model version has been promoted to the checkpoint directory.
- Return type:
- Returns:
The freshly loaded
TwigaForecaster.- Raises:
NotFittedError – If no checkpoint files are found.
- property targets: list[str]#
Target variable names from the fitted data pipeline.
- Raises:
NotFittedError – If the forecaster has not been loaded yet.
Request / response schemas#
Pydantic request and response schemas for the Twiga serving API.
All schemas use strict typing and are self-documenting through Field
descriptions, consistent with the rest of the Twiga config system.
- class twiga.serve.schemas.ForecastRequest(**data)#
Bases:
BaseModelPayload for a point or interval forecast request.
- Parameters:
records (list[dict[str, Any]]) – Time series records as a list of dicts (each dict is one row, must contain the
timestampcolumn and all feature columns required by the fitted pipeline).ensemble_strategy (Literal['mean', 'median', 'weighted_mean'] | None) – How to combine predictions from multiple models.
Nonereturns predictions per model.prepare_test_data (bool) – Whether to prepend training tail rows before transforming (mirrors the forecaster flag).
Example:
{ "records": [ {"timestamp": "2024-01-01T00:00:00", "temperature": 12.5, ...}, ... ], "ensemble_strategy": "mean" }
- class twiga.serve.schemas.ForecastResponse(**data)#
Bases:
BaseModelResponse for a point forecast request.
- Parameters:
forecasts (list[ModelPrediction]) – Per-model prediction payloads.
targets (list[str]) – Target variable names in the same order as the innermost dimension of each
predictionsarray.forecast_horizon (int) – Number of future time steps predicted.
- forecasts: list[ModelPrediction]#
- classmethod from_arrays(predictions, inference_times, targets, forecast_horizon)#
Build a response from raw numpy prediction arrays.
- Parameters:
- Return type:
- Returns:
Populated
ForecastResponse.
- class twiga.serve.schemas.HealthRequest(**data)#
Bases:
BaseModelEmpty request body for health-check endpoints.
- class twiga.serve.schemas.HealthResponse(**data)#
Bases:
BaseModelHealth-check response.
- Parameters:
- class twiga.serve.schemas.IntervalPrediction(**data)#
Bases:
BaseModelConformal interval prediction for a single model.
- Parameters:
model (str) – Model identifier string.
lower (list[list[list[float]]]) – Lower bound array
(n_batch, n_horizon, n_targets).forecast (list[list[list[float]]]) – Point forecast array
(n_batch, n_horizon, n_targets).upper (list[list[list[float]]]) – Upper bound array
(n_batch, n_horizon, n_targets).coverage (float) – Nominal coverage level
(1 - alpha).inference_time (float) – Wall-clock seconds spent on inference.
- class twiga.serve.schemas.IntervalRequest(**data)#
Bases:
ForecastRequestPayload for a conformal prediction interval request.
Extends
ForecastRequestwith coverage level control.- Parameters:
alpha (float) – Miscoverage rate.
alpha=0.1targets 90 % coverage.
- class twiga.serve.schemas.IntervalResponse(**data)#
Bases:
BaseModelResponse for a conformal interval forecast request.
- Parameters:
forecasts (list[IntervalPrediction]) – Per-model interval prediction payloads.
forecast_horizon (int) – Number of future time steps predicted.
- forecasts: list[IntervalPrediction]#
- classmethod from_arrays(predictions, inference_times, targets, forecast_horizon, alpha=0.1)#
Build a response from raw numpy interval arrays.
- Parameters:
- Return type:
- Returns:
Populated
IntervalResponse.
- class twiga.serve.schemas.ModelPrediction(**data)#
Bases:
BaseModelPoint predictions for a single model.
- Parameters:
ModelLoader#
ModelLoader handles lazy, cached checkpoint loading so that deserialisation
happens once at startup rather than on every request.
from twiga.serve import ModelLoader
loader = ModelLoader(forecaster)
fc = loader.load() # loads from checkpoint_path; cached on subsequent calls
loader.reload() # force fresh load (e.g. after a new model version is saved)
Key properties exposed after loading:
Property |
Type |
Description |
|---|---|---|
|
|
|
|
|
Target variable names |
|
|
Steps ahead |
|
|
Names of all loaded models |
Deployment tips#
Pre-fork model loading - For multi-worker deployments use gunicorn with
--preload so the checkpoint is loaded once in the master process and forked
into each worker, avoiding repeated disk I/O:
gunicorn serve:app -k uvicorn.workers.UvicornWorker -w 4 --preload
Health checks - Wire /health to your container orchestrator’s liveness
and readiness probes. The endpoint returns status: "degraded" if the model
has not been loaded successfully.
Hot reload - After saving a new checkpoint, trigger /reload via your
deployment pipeline instead of restarting the container.
See Pipeline for how training_flow saves checkpoints and
triggers /reload automatically via Prefect.