MLOps Architecture#
Source files
twiga/core/settings.py-TwigaSettings(pydantic-settings env config)twiga/forecaster/base.py-on_save_checkpoint,on_load_checkpoint,RawPredictiontwiga/forecaster/result.py-RawPrediction,ForecastKind,ForecastResulttwiga/tracking/tracker.py-TwigaTrackertwiga/serve/app.py-create_app()twiga/serve/loader.py-ModelLoadertwiga/serve/monitor.py-ForecastMonitortwiga/serve/schemas.py- request / response Pydantic modelstwiga/pipeline/schemas.py-DriftSummary,TrainingResult,RetrainingResulttwiga/pipeline/tasks.py- Prefect taskstwiga/pipeline/flows.py-training_flow,retraining_flow
Overview#
Twiga’s MLOps layer is organised around four concerns that map directly to the four stages of a production ML lifecycle:
Stage |
Module |
Tool |
Key class |
|---|---|---|---|
Experiment tracking |
|
MLflow |
|
Model serving |
|
FastAPI + anyio |
|
Production monitoring |
|
Evidently |
|
Orchestration |
|
Prefect |
|
Each module is independently installable and can be used in isolation, but they are designed to compose into a complete MLOps pipeline.
pip install twiga[mlops]
Package structure#
twiga/
├── core/
│ └── settings.py ← TwigaSettings (TWIGA_* env vars via pydantic-settings)
│
├── forecaster/
│ ├── result.py ← RawPrediction dataclass + ForecastKind enum
│ └── base.py ← on_save_checkpoint / on_load_checkpoint hooks
│ writes _checkpoint_manifest.json (versioned)
│
├── tracking/
│ ├── __init__.py
│ └── tracker.py ← TwigaTracker (MLflow context manager)
│
├── serve/
│ ├── __init__.py
│ ├── schemas.py ← ForecastRequest/Response, IntervalRequest/Response,
│ │ HealthResponse, MonitorResponse (Pydantic)
│ ├── loader.py ← ModelLoader (thread-safe lazy checkpoint loader)
│ ├── monitor.py ← ForecastMonitor (Evidently drift + performance)
│ └── app.py ← create_app() FastAPI factory (async, anyio offload)
│
└── pipeline/
├── __init__.py
├── schemas.py ← DriftSummary, TrainingResult, RetrainingResult (dataclasses)
├── tasks.py ← Prefect tasks (load_data, fit, evaluate, checkpoint, mlflow)
└── flows.py ← training_flow → TrainingResult
retraining_flow → RetrainingResult
Forecaster ↔ MLOps integration#
The TwigaForecaster is the shared artefact that flows through every MLOps
stage. The connection points are two lifecycle hooks defined on BaseForecaster:
graph LR
subgraph CORE ["twiga.forecaster (core)"]
FC[TwigaForecaster]
RP["_predict\n→ RawPrediction"]
SCK[on_save_checkpoint]
LCK[on_load_checkpoint]
MFT["_checkpoint_manifest.json\nversion · model_type · saved_at"]
FC -->|"fit / predict / evaluate"| RP
FC --> SCK
FC --> LCK
SCK -->|"joblib.dump + manifest write"| MFT
LCK -->|"reads manifest → loads pkl"| MFT
end
subgraph TRACK ["twiga.tracking"]
TRK[TwigaTracker]
FC -->|"log_forecaster_metadata\nlog_model\nlog_evaluation"| TRK
TRK -->|"params · metrics · artefacts"| MLF[(MLflow Registry)]
end
subgraph SERVE ["twiga.serve"]
LDR["ModelLoader\n(thread-safe · lazy)"]
MFT -->|"_do_load → on_load_checkpoint"| LDR
LDR -->|"fitted forecaster"| APP
APP["create_app\nFastAPI async"]
APP -->|"/predict\nRawPrediction.loc"| P[point forecast]
APP -->|"/predict-interval\nRawPrediction bounds"| I[interval forecast]
end
subgraph PIPE ["twiga.pipeline"]
TF["training_flow\n→ TrainingResult"]
RF["retraining_flow\n→ RetrainingResult"]
TF -->|"fit_forecaster"| FC
TF -->|"save_checkpoint"| MFT
TF -->|"log_to_mlflow"| TRK
RF -->|"run_drift_check\n→ DriftSummary"| EV[ForecastMonitor]
RF -->|"fit_forecaster"| FC
RF -->|"save_checkpoint if promoted"| MFT
RF -->|"POST /reload"| APP
end
Checkpoint versioning#
Every on_save_checkpoint() call writes or updates
{checkpoints_path}/_checkpoint_manifest.json:
{
"version": 3,
"checkpoint_file": "model_and_pipeline.pkl",
"model_type": "lightgbm",
"saved_at": "2024-06-01T12:00:00+00:00"
}
on_load_checkpoint() reads the manifest first; if absent it falls back to
the most-recently-modified *.pkl file for backward compatibility with
checkpoints saved before the manifest was introduced.
Component architecture#
graph TD
CFG["TwigaSettings\n(TWIGA_* env vars)"]
subgraph DEV ["Development - fit & track"]
FC[TwigaForecaster]
TRK["TwigaTracker\nMLflow"]
FC -->|"fit / evaluate\n→ RawPrediction"| TRK
TRK -->|"params · metrics · checkpoint"| MLF[("MLflow Registry")]
end
subgraph ORCH ["Orchestration - Prefect"]
TF["training_flow\n→ TrainingResult"]
RF["retraining_flow\n→ RetrainingResult"]
TF -->|"load_data"| D[("Dataset")]
TF -->|"fit_forecaster"| FC
TF -->|"evaluate_model"| M[Metrics DF]
TF -->|"save_checkpoint"| CK[("Checkpoint Dir\n_manifest.json")]
TF -->|"log_to_mlflow"| MLF
RF -->|"run_drift_check\n→ DriftSummary"| EV[Evidently]
RF -->|"fit + evaluate"| FC
RF -->|"promote if better"| CK
end
subgraph SERVE ["Serving - FastAPI + anyio"]
APP[create_app]
LDR["ModelLoader\nLock · double-check"]
MON["ForecastMonitor\nEvidently"]
CK -->|"on_load_checkpoint"| LDR
LDR -->|"fitted forecaster"| APP
APP -->|"/predict\n(anyio thread)"| RP[RawPrediction.loc]
APP -->|"/predict-interval\n(anyio thread)"| RI[RawPrediction bounds]
APP -->|"/monitor/drift"| MON
APP -->|"/monitor/performance"| MON
APP -->|"/reload"| LDR
end
CFG -.->|"mlflow_tracking_uri\nserve_api_key"| DEV
CFG -.->|"model_checkpoint_dir\nserve_report_dir"| SERVE
DEV --> ORCH
ORCH --> SERVE
Data contracts#
All inter-module boundaries use typed dataclasses or Pydantic models - no
raw dict[str, Any] at the seams.
Pipeline return types (twiga/pipeline/schemas.py)#
@dataclass
class DriftSummary:
drift_detected: bool = False
dataset_drift_score: float = 0.0
n_drifted: int = 0
total: int = 0
feature_drift: dict[str, float] = field(default_factory=dict)
report_path: str | None = None
timestamp: str = ""
@dataclass
class TrainingResult:
checkpoint_path: str
mlflow_run_id: str
metrics: dict[str, dict[str, float]] # {model: {metric: value}}
@dataclass
class RetrainingResult:
retrained: bool
promoted: bool
drift_summary: DriftSummary
checkpoint_path: str | None = None
mlflow_run_id: str | None = None
metrics: dict[str, dict[str, float]] | None = None
Serving schemas (twiga/serve/schemas.py)#
class ForecastRequest(BaseModel):
records: list[dict[str, Any]] # min_length=1
ensemble_strategy: str | None = None
prepare_test_data: bool = True
class ForecastResponse(BaseModel):
forecasts: list[ModelPrediction] # [{model, predictions, inference_time}]
class IntervalResponse(BaseModel):
forecasts: list[IntervalPrediction] # [{model, lower, forecast, upper, coverage}]
class HealthResponse(BaseModel):
status: Literal["ok", "degraded"]
models: list[str]
forecast_horizon: int
targets: list[str]
Prediction flow inside serving#
graph LR
REQ["ForecastRequest\n(records: list[dict])"]
DF["pd.DataFrame\n_records_to_df()"]
THR["anyio.to_thread\n.run_sync(fc.predict)"]
RAW["RawPrediction\n(loc, kind, lower, upper…)"]
RESP["ForecastResponse\n.from_arrays()"]
REQ --> DF --> THR --> RAW --> RESP
Data flow - complete production cycle#
sequenceDiagram
autonumber
participant DS as Dataset
participant PF as Prefect
participant FC as TwigaForecaster
participant CK as Checkpoint + Manifest
participant ML as MLflow
participant API as FastAPI
participant EV as Evidently
participant OPS as Ops / scheduler
Note over PF,FC: training_flow → TrainingResult
PF->>DS: load_data()
DS-->>PF: train_df, test_df
PF->>FC: fit_forecaster()
FC-->>PF: fitted forecaster
PF->>FC: evaluate_model() → RawPrediction
FC-->>PF: predictions_df, metrics_df
PF->>CK: save_checkpoint() → manifest v1
PF->>ML: log_to_mlflow() via TwigaTracker
ML-->>PF: run_id → TrainingResult
Note over API,CK: API startup
API->>CK: ModelLoader.load() reads manifest
CK-->>API: forecaster (thread-safe, loaded once)
Note over API,EV: Production serving
API->>FC: POST /predict (anyio thread)
FC-->>API: RawPrediction.loc → ForecastResponse
API->>EV: POST /monitor/drift
EV-->>API: DriftSummary via ForecastMonitor
Note over OPS,PF: Scheduled retraining → RetrainingResult
OPS->>PF: trigger retraining_flow
PF->>EV: run_drift_check() → DriftSummary
EV-->>PF: drift_detected=True
PF->>FC: fit_forecaster() on current window
PF->>FC: evaluate_model() → new metrics
FC-->>PF: new_metric < incumbent → promoted=True
PF->>CK: save_checkpoint() → manifest v2
PF->>ML: log_to_mlflow() → new run_id
PF->>API: POST /reload → ModelLoader.reload()
API->>CK: re-reads manifest v2, loads new pkl
Configuration - TwigaSettings#
All runtime configuration follows 12-factor principles via TwigaSettings:
from twiga.core.settings import TwigaSettings
settings = TwigaSettings() # reads TWIGA_* env vars or .env file
Environment variable |
Default |
Used by |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Design principles#
- Config-driven
TwigaTracker,ForecastMonitor, and the Prefect flows all accept plain Python arguments.TwigaSettingsprovides 12-factor env-variable overrides for deployment without code changes.- Typed contracts at every boundary
Pipeline flows return dataclasses (
TrainingResult,RetrainingResult,DriftSummary). The serving layer uses Pydantic models for all HTTP payloads. Prediction internals useRawPredictionto carry typed forecast state through_predict()→_rescale_raw()→ HTTP response.- Thread-safe async serving
ModelLoaderuses double-checked locking so concurrent requests cannot trigger a double-load race. CPU-boundfc.predict()calls are offloaded to a thread pool viaanyio.to_thread.run_syncto keep the async event loop responsive under production load.- Composable and independently installable
Each module works standalone. You can use
TwigaTrackerwithout FastAPI, runForecastMonitorwithout Prefect, and schedule flows without the serving layer. Lazy imports ensure thatimport twiganever fails due to missing optional MLOps dependencies.- Versioned checkpoints
_checkpoint_manifest.jsongives every checkpoint a monotonically increasing version number, model type, and UTC timestamp. This makes rollbacks auditable and removes the mtime-sorting fragility of selecting checkpoints by file modification time.