MLOps Architecture#

Source files
  • twiga/core/settings.py - TwigaSettings (pydantic-settings env config)

  • twiga/forecaster/base.py - on_save_checkpoint, on_load_checkpoint, RawPrediction

  • twiga/forecaster/result.py - RawPrediction, ForecastKind, ForecastResult

  • twiga/tracking/tracker.py - TwigaTracker

  • twiga/serve/app.py - create_app()

  • twiga/serve/loader.py - ModelLoader

  • twiga/serve/monitor.py - ForecastMonitor

  • twiga/serve/schemas.py - request / response Pydantic models

  • twiga/pipeline/schemas.py - DriftSummary, TrainingResult, RetrainingResult

  • twiga/pipeline/tasks.py - Prefect tasks

  • twiga/pipeline/flows.py - training_flow, retraining_flow

Overview#

Twiga’s MLOps layer is organised around four concerns that map directly to the four stages of a production ML lifecycle:

Stage

Module

Tool

Key class

Experiment tracking

twiga.tracking

MLflow

TwigaTracker

Model serving

twiga.serve

FastAPI + anyio

create_app, ModelLoader

Production monitoring

twiga.serve.monitor

Evidently

ForecastMonitor

Orchestration

twiga.pipeline

Prefect

training_flow, retraining_flow

Each module is independently installable and can be used in isolation, but they are designed to compose into a complete MLOps pipeline.

pip install twiga[mlops]

Package structure#

twiga/
├── core/
│   └── settings.py         ← TwigaSettings  (TWIGA_* env vars via pydantic-settings)
│
├── forecaster/
│   ├── result.py           ← RawPrediction dataclass + ForecastKind enum
│   └── base.py             ← on_save_checkpoint / on_load_checkpoint hooks
│                             writes _checkpoint_manifest.json (versioned)
│
├── tracking/
│   ├── __init__.py
│   └── tracker.py          ← TwigaTracker (MLflow context manager)
│
├── serve/
│   ├── __init__.py
│   ├── schemas.py          ← ForecastRequest/Response, IntervalRequest/Response,
│   │                          HealthResponse, MonitorResponse  (Pydantic)
│   ├── loader.py           ← ModelLoader  (thread-safe lazy checkpoint loader)
│   ├── monitor.py          ← ForecastMonitor (Evidently drift + performance)
│   └── app.py              ← create_app()  FastAPI factory (async, anyio offload)
│
└── pipeline/
    ├── __init__.py
    ├── schemas.py          ← DriftSummary, TrainingResult, RetrainingResult (dataclasses)
    ├── tasks.py            ← Prefect tasks  (load_data, fit, evaluate, checkpoint, mlflow)
    └── flows.py            ← training_flow → TrainingResult
                              retraining_flow → RetrainingResult

Forecaster ↔ MLOps integration#

The TwigaForecaster is the shared artefact that flows through every MLOps stage. The connection points are two lifecycle hooks defined on BaseForecaster:

        graph LR
    subgraph CORE ["twiga.forecaster (core)"]
        FC[TwigaForecaster]
        RP["_predict\n→ RawPrediction"]
        SCK[on_save_checkpoint]
        LCK[on_load_checkpoint]
        MFT["_checkpoint_manifest.json\nversion · model_type · saved_at"]
        FC -->|"fit / predict / evaluate"| RP
        FC --> SCK
        FC --> LCK
        SCK -->|"joblib.dump + manifest write"| MFT
        LCK -->|"reads manifest → loads pkl"| MFT
    end

    subgraph TRACK ["twiga.tracking"]
        TRK[TwigaTracker]
        FC -->|"log_forecaster_metadata\nlog_model\nlog_evaluation"| TRK
        TRK -->|"params · metrics · artefacts"| MLF[(MLflow Registry)]
    end

    subgraph SERVE ["twiga.serve"]
        LDR["ModelLoader\n(thread-safe · lazy)"]
        MFT -->|"_do_load → on_load_checkpoint"| LDR
        LDR -->|"fitted forecaster"| APP
        APP["create_app\nFastAPI async"]
        APP -->|"/predict\nRawPrediction.loc"| P[point forecast]
        APP -->|"/predict-interval\nRawPrediction bounds"| I[interval forecast]
    end

    subgraph PIPE ["twiga.pipeline"]
        TF["training_flow\n→ TrainingResult"]
        RF["retraining_flow\n→ RetrainingResult"]
        TF -->|"fit_forecaster"| FC
        TF -->|"save_checkpoint"| MFT
        TF -->|"log_to_mlflow"| TRK
        RF -->|"run_drift_check\n→ DriftSummary"| EV[ForecastMonitor]
        RF -->|"fit_forecaster"| FC
        RF -->|"save_checkpoint if promoted"| MFT
        RF -->|"POST /reload"| APP
    end
    

Checkpoint versioning#

Every on_save_checkpoint() call writes or updates {checkpoints_path}/_checkpoint_manifest.json:

{
  "version": 3,
  "checkpoint_file": "model_and_pipeline.pkl",
  "model_type": "lightgbm",
  "saved_at": "2024-06-01T12:00:00+00:00"
}

on_load_checkpoint() reads the manifest first; if absent it falls back to the most-recently-modified *.pkl file for backward compatibility with checkpoints saved before the manifest was introduced.


Component architecture#

        graph TD
    CFG["TwigaSettings\n(TWIGA_* env vars)"]

    subgraph DEV ["Development  -  fit & track"]
        FC[TwigaForecaster]
        TRK["TwigaTracker\nMLflow"]
        FC -->|"fit / evaluate\n→ RawPrediction"| TRK
        TRK -->|"params · metrics · checkpoint"| MLF[("MLflow Registry")]
    end

    subgraph ORCH ["Orchestration  -  Prefect"]
        TF["training_flow\n→ TrainingResult"]
        RF["retraining_flow\n→ RetrainingResult"]
        TF -->|"load_data"| D[("Dataset")]
        TF -->|"fit_forecaster"| FC
        TF -->|"evaluate_model"| M[Metrics DF]
        TF -->|"save_checkpoint"| CK[("Checkpoint Dir\n_manifest.json")]
        TF -->|"log_to_mlflow"| MLF
        RF -->|"run_drift_check\n→ DriftSummary"| EV[Evidently]
        RF -->|"fit + evaluate"| FC
        RF -->|"promote if better"| CK
    end

    subgraph SERVE ["Serving  -  FastAPI + anyio"]
        APP[create_app]
        LDR["ModelLoader\nLock · double-check"]
        MON["ForecastMonitor\nEvidently"]
        CK -->|"on_load_checkpoint"| LDR
        LDR -->|"fitted forecaster"| APP
        APP -->|"/predict\n(anyio thread)"| RP[RawPrediction.loc]
        APP -->|"/predict-interval\n(anyio thread)"| RI[RawPrediction bounds]
        APP -->|"/monitor/drift"| MON
        APP -->|"/monitor/performance"| MON
        APP -->|"/reload"| LDR
    end

    CFG -.->|"mlflow_tracking_uri\nserve_api_key"| DEV
    CFG -.->|"model_checkpoint_dir\nserve_report_dir"| SERVE
    DEV --> ORCH
    ORCH --> SERVE
    

Data contracts#

All inter-module boundaries use typed dataclasses or Pydantic models - no raw dict[str, Any] at the seams.

Pipeline return types (twiga/pipeline/schemas.py)#

@dataclass
class DriftSummary:
    drift_detected: bool = False
    dataset_drift_score: float = 0.0
    n_drifted: int = 0
    total: int = 0
    feature_drift: dict[str, float] = field(default_factory=dict)
    report_path: str | None = None
    timestamp: str = ""

@dataclass
class TrainingResult:
    checkpoint_path: str
    mlflow_run_id: str
    metrics: dict[str, dict[str, float]]   # {model: {metric: value}}

@dataclass
class RetrainingResult:
    retrained: bool
    promoted: bool
    drift_summary: DriftSummary
    checkpoint_path: str | None = None
    mlflow_run_id: str | None = None
    metrics: dict[str, dict[str, float]] | None = None

Serving schemas (twiga/serve/schemas.py)#

class ForecastRequest(BaseModel):
    records: list[dict[str, Any]]   # min_length=1
    ensemble_strategy: str | None = None
    prepare_test_data: bool = True

class ForecastResponse(BaseModel):
    forecasts: list[ModelPrediction]   # [{model, predictions, inference_time}]

class IntervalResponse(BaseModel):
    forecasts: list[IntervalPrediction]  # [{model, lower, forecast, upper, coverage}]

class HealthResponse(BaseModel):
    status: Literal["ok", "degraded"]
    models: list[str]
    forecast_horizon: int
    targets: list[str]

Prediction flow inside serving#

        graph LR
    REQ["ForecastRequest\n(records: list[dict])"]
    DF["pd.DataFrame\n_records_to_df()"]
    THR["anyio.to_thread\n.run_sync(fc.predict)"]
    RAW["RawPrediction\n(loc, kind, lower, upper…)"]
    RESP["ForecastResponse\n.from_arrays()"]

    REQ --> DF --> THR --> RAW --> RESP
    

Data flow - complete production cycle#

        sequenceDiagram
    autonumber
    participant DS  as Dataset
    participant PF  as Prefect
    participant FC  as TwigaForecaster
    participant CK  as Checkpoint + Manifest
    participant ML  as MLflow
    participant API as FastAPI
    participant EV  as Evidently
    participant OPS as Ops / scheduler

    Note over PF,FC: training_flow → TrainingResult
    PF->>DS: load_data()
    DS-->>PF: train_df, test_df
    PF->>FC: fit_forecaster()
    FC-->>PF: fitted forecaster
    PF->>FC: evaluate_model() → RawPrediction
    FC-->>PF: predictions_df, metrics_df
    PF->>CK: save_checkpoint() → manifest v1
    PF->>ML: log_to_mlflow() via TwigaTracker
    ML-->>PF: run_id → TrainingResult

    Note over API,CK: API startup
    API->>CK: ModelLoader.load() reads manifest
    CK-->>API: forecaster (thread-safe, loaded once)

    Note over API,EV: Production serving
    API->>FC: POST /predict (anyio thread)
    FC-->>API: RawPrediction.loc → ForecastResponse
    API->>EV: POST /monitor/drift
    EV-->>API: DriftSummary via ForecastMonitor

    Note over OPS,PF: Scheduled retraining → RetrainingResult
    OPS->>PF: trigger retraining_flow
    PF->>EV: run_drift_check() → DriftSummary
    EV-->>PF: drift_detected=True
    PF->>FC: fit_forecaster() on current window
    PF->>FC: evaluate_model() → new metrics
    FC-->>PF: new_metric < incumbent → promoted=True
    PF->>CK: save_checkpoint() → manifest v2
    PF->>ML: log_to_mlflow() → new run_id
    PF->>API: POST /reload → ModelLoader.reload()
    API->>CK: re-reads manifest v2, loads new pkl
    

Configuration - TwigaSettings#

All runtime configuration follows 12-factor principles via TwigaSettings:

from twiga.core.settings import TwigaSettings

settings = TwigaSettings()          # reads TWIGA_* env vars or .env file

Environment variable

Default

Used by

TWIGA_MLFLOW_TRACKING_URI

http://localhost:5000

TwigaTracker

TWIGA_MLFLOW_EXPERIMENT

twiga

TwigaTracker

TWIGA_SERVE_API_KEY

"" (open access)

create_app

TWIGA_MODEL_CHECKPOINT_DIR

checkpoints

ForecasterConfig

TWIGA_SERVE_REPORT_DIR

reports

ForecastMonitor


Design principles#

Config-driven

TwigaTracker, ForecastMonitor, and the Prefect flows all accept plain Python arguments. TwigaSettings provides 12-factor env-variable overrides for deployment without code changes.

Typed contracts at every boundary

Pipeline flows return dataclasses (TrainingResult, RetrainingResult, DriftSummary). The serving layer uses Pydantic models for all HTTP payloads. Prediction internals use RawPrediction to carry typed forecast state through _predict()_rescale_raw() → HTTP response.

Thread-safe async serving

ModelLoader uses double-checked locking so concurrent requests cannot trigger a double-load race. CPU-bound fc.predict() calls are offloaded to a thread pool via anyio.to_thread.run_sync to keep the async event loop responsive under production load.

Composable and independently installable

Each module works standalone. You can use TwigaTracker without FastAPI, run ForecastMonitor without Prefect, and schedule flows without the serving layer. Lazy imports ensure that import twiga never fails due to missing optional MLOps dependencies.

Versioned checkpoints

_checkpoint_manifest.json gives every checkpoint a monotonically increasing version number, model type, and UTC timestamp. This makes rollbacks auditable and removes the mtime-sorting fragility of selecting checkpoints by file modification time.