Pipeline Orchestration#

Source files
  • twiga/pipeline/tasks.py - Prefect tasks

  • twiga/pipeline/flows.py - training_flow, retraining_flow

  • twiga/pipeline/schemas.py - TrainingResult, RetrainingResult, DriftSummary

Overview#

twiga.pipeline provides two Prefect flows that orchestrate the complete Twiga training and retraining lifecycle. Each stage of the pipeline is a separate Prefect task - enabling independent retries, logging, and observability in the Prefect UI.

Both flows return typed dataclasses so downstream code gets IDE completion and type checking with no dict-key guesswork.

pip install twiga[mlops]

training_flow#

The full train → evaluate → checkpoint → MLflow pipeline in one call:

from twiga.pipeline import training_flow
from twiga.pipeline.schemas import TrainingResult

result: TrainingResult = training_flow(
    forecaster=forecaster,
    data_path="data/load.parquet",
    date_col="timestamp",
    train_end="2023-12-31",
    test_start="2024-01-01",
    tune=True,
    num_trials=30,
    experiment="load-forecast-v2",
    run_name="lgbm-run-001",
)

print(result.checkpoint_path)
print(result.mlflow_run_id)
print(result.metrics)

Steps#

        graph LR
    A[load_data] --> B[fit_forecaster]
    B --> C[evaluate_model]
    C --> D[save_checkpoint]
    D --> E[log_to_mlflow]
    E --> F[publish_artifact]
    

Step

Task

Description

1

load_data

Read parquet/CSV, parse timestamps, split train/test

2

fit_forecaster

Optionally tune with Optuna, then fit on training data

3

evaluate_model

Run forecaster.evaluate() on test split

4

save_checkpoint

Persist model + write _checkpoint_manifest.json

5

log_to_mlflow

Log params, metrics, and checkpoint artefacts via TwigaTracker

6

Publish artefact

Markdown metrics table in the Prefect UI

Return value - TrainingResult#

from twiga.pipeline.schemas import TrainingResult

result.checkpoint_path   # "/checkpoints/load-forecast/20240101/"
result.mlflow_run_id     # "a1b2c3d4e5f6"
result.metrics           # {"lightgbm": {"mae": 4.23, "rmse": 6.11, "corr": 0.97}}

API reference#

twiga.pipeline.flows.training_flow(forecaster, data_path, date_col='timestamp', train_end=None, test_start=None, tune=False, num_trials=20, ensemble_strategy=None, experiment='twiga', run_name=None, tags=None, report_dir='reports')#

Full training pipeline: load → fit → evaluate → checkpoint → MLflow.

Parameters:
  • forecaster (TwigaForecaster) – Configured (unfitted) forecaster instance.

  • data_path (str | Path) – Path to the dataset file (.parquet or .csv).

  • date_col (str) – Name of the timestamp column in the dataset.

  • train_end (str | None) – Inclusive end date for the training split (YYYY-MM-DD).

  • test_start (str | None) – Inclusive start date for the test split (YYYY-MM-DD).

  • tune (bool) – If True, run Optuna HPO before the final fit.

  • num_trials (int) – Number of Optuna trials (used when tune=True).

  • ensemble_strategy (str | None) – Ensemble aggregation strategy for evaluation.

  • experiment (str) – MLflow experiment name.

  • run_name (str | None) – Human-readable MLflow run label.

  • tags (dict[str, str] | None) – Optional MLflow tags dict.

  • report_dir (str | Path) – Directory for Evidently HTML reports.

Return type:

TrainingResult

Returns:

TrainingResult with checkpoint_path, mlflow_run_id, and metrics.


retraining_flow#

Drift-aware retraining that only promotes the new model if it outperforms the incumbent:

from twiga.pipeline import retraining_flow
from twiga.pipeline.schemas import RetrainingResult

result: RetrainingResult = retraining_flow(
    forecaster=forecaster,
    reference_data_path="data/train.parquet",
    current_data_path="data/current_window.parquet",
    drift_threshold=0.5,          # retrain if > 50 % of features drifted
    performance_threshold=0.15,   # also retrain if incumbent MAE > 0.15
    metric_key="mae",
    experiment="load-forecast-retrain",
)

print(result.retrained, result.promoted)
print(result.drift_summary.drift_detected)
print(result.drift_summary.n_drifted)

Decision logic#

        graph TD
    A[load reference + current data] --> B[run_drift_check\n→ DriftSummary]
    B --> C{drift_fraction\n> threshold?}
    C -->|No| D[evaluate incumbent]
    C -->|Yes| G
    D --> E{perf_threshold\nbreached?}
    E -->|No| F[Return: retrained=False]
    E -->|Yes| G[fit_forecaster on current data]
    G --> H[evaluate new model]
    H --> I{new_metric\n< incumbent?}
    I -->|Yes| J[save_checkpoint\n+ log_to_mlflow]
    I -->|No| K[Return: promoted=False]
    J --> L[Return: promoted=True\n+ POST /reload]
    

Incumbent evaluation - error classification#

The flow distinguishes expected first-run conditions from unexpected failures:

try:
    forecaster.on_load_checkpoint()
    # evaluate incumbent...
except (NotFittedError, ValueError) as exc:
    # Expected: no checkpoint yet on first run
    prefect_log.info("No incumbent model available (%s)  -  will retrain.", exc)
except Exception as exc:
    # Unexpected: corrupted checkpoint, pipeline error
    prefect_log.warning("Unexpected error evaluating incumbent: %s", exc)

Return value - RetrainingResult#

from twiga.pipeline.schemas import RetrainingResult, DriftSummary

result.retrained          # True
result.promoted           # True
result.checkpoint_path    # "/checkpoints/load-forecast/20240601/"
result.mlflow_run_id      # "f7e8d9c0b1a2"
result.metrics            # {"lightgbm": {"mae": 3.91, "rmse": 5.72}}

ds: DriftSummary = result.drift_summary
ds.drift_detected         # True
ds.dataset_drift_score    # 0.6
ds.n_drifted              # 4
ds.feature_drift          # {"temperature": 0.21, "hour": 0.04, ...}
ds.report_path            # "/reports/data_drift_20240601T120000Z.html"

API reference#

twiga.pipeline.flows.retraining_flow(forecaster, reference_data_path, current_data_path, date_col='timestamp', train_end=None, test_start=None, drift_threshold=0.5, performance_threshold=None, metric_key='mae', minimize=True, tune=False, num_trials=20, ensemble_strategy=None, experiment='twiga-retrain', run_name=None, tags=None, report_dir='reports', fastapi_reload_url=None, fastapi_reload_api_key_secret=None)#

Drift-aware retraining flow: check → retrain → promote if better.

Parameters:
  • forecaster (TwigaForecaster) – Configured (unfitted) forecaster. The current checkpoint is loaded for baseline evaluation before retraining.

  • reference_data_path (str | Path) – Path to the reference (historical) dataset.

  • current_data_path (str | Path) – Path to the current production data window.

  • date_col (str) – Timestamp column name.

  • train_end (str | None) – End date for the training split of current data.

  • test_start (str | None) – Start date for the test split of current data.

  • drift_threshold (float) – Fraction of drifted features that triggers retrain.

  • performance_threshold (float | None) – Optional absolute metric threshold that also triggers retraining.

  • metric_key (str) – Metric column used for performance comparison.

  • minimize (bool) – If True, lower metric is better (e.g. MAE, RMSE).

  • tune (bool) – Run HPO during retraining.

  • num_trials (int) – HPO trial budget.

  • ensemble_strategy (str | None) – Ensemble strategy for evaluation.

  • experiment (str) – MLflow experiment name.

  • run_name (str | None) – MLflow run label.

  • tags (dict[str, str] | None) – MLflow tags dict.

  • report_dir (str | Path) – Directory for Evidently HTML reports.

  • fastapi_reload_url (str | None) – Optional webhook URL to hot-reload the API after model promotion (e.g. "http://api:8000").

  • fastapi_reload_api_key_secret (str | None) – Name of a Prefect Secret holding the X-API-Key value for the reload webhook.

Return type:

RetrainingResult

Returns:

RetrainingResult with full decision context: retrained, promoted, drift_summary, checkpoint_path, mlflow_run_id, and metrics.


Pipeline schemas#

class twiga.pipeline.schemas.TrainingResult(checkpoint_path, mlflow_run_id, metrics=<factory>)

Bases: object

Typed output of training_flow().

Variables:
  • checkpoint_path – Absolute path to the saved checkpoint directory.

  • mlflow_run_id – MLflow run ID for the training run.

  • metrics – Aggregated evaluation metrics keyed by model name, then metric name. Example: {"lgbm": {"mae": 0.12, "rmse": 0.18}}.

checkpoint_path: str
metrics: dict[str, dict[str, float]]
mlflow_run_id: str
class twiga.pipeline.schemas.RetrainingResult(retrained, promoted, drift_summary, checkpoint_path=None, mlflow_run_id=None, metrics=None)

Bases: object

Typed output of retraining_flow().

Variables:
  • retrained – Whether a new model was trained in this run.

  • promoted – Whether the new model replaced the incumbent.

  • drift_summary – Structured Evidently drift report.

  • checkpoint_path – Path to the new checkpoint (None if not promoted).

  • mlflow_run_id – MLflow run ID for the retraining run (None if not retrained or not promoted).

  • metrics – Aggregated evaluation metrics for the new model (None if not promoted).

checkpoint_path: str | None = None
drift_summary: DriftSummary
metrics: dict[str, dict[str, float]] | None = None
mlflow_run_id: str | None = None
promoted: bool
retrained: bool
class twiga.pipeline.schemas.DriftSummary(drift_detected=False, dataset_drift_score=0.0, n_drifted=0, total=0, feature_drift=<factory>, report_path=None, timestamp='')

Bases: object

Evidently drift report summary produced by the run-drift-check task.

Variables:
  • drift_detectedTrue when at least one feature drifted.

  • dataset_drift_score – Fraction of features that drifted (0..1).

  • n_drifted – Number of drifted features.

  • total – Total number of monitored features.

  • feature_drift – Per-feature drift scores keyed by column name.

  • report_path – Absolute path to the saved HTML report (or None).

  • timestamp – ISO-8601 timestamp when the report was generated.

dataset_drift_score: float = 0.0
drift_detected: bool = False
feature_drift: dict[str, float]
classmethod from_dict(d)

Build from the raw dict returned by run_drift_check().

Unknown keys in d are silently ignored so the schema can evolve without breaking callers that still pass raw dicts.

Return type:

DriftSummary

n_drifted: int = 0
report_path: str | None = None
timestamp: str = ''
total: int = 0

Tasks#

Each task is individually retryable and can be imported and called directly outside of a flow (useful for notebooks and testing):

# Call without a running Prefect server
from twiga.pipeline.tasks import load_data

train_df, test_df = load_data.fn(
    data_path="data/load.parquet",
    date_col="timestamp",
    train_end="2023-12-31",
    test_start="2024-01-01",
)

load_data#

twiga.pipeline.tasks.load_data(data_path, date_col='timestamp', train_end=None, test_start=None)#

Load and split a time series dataset into train/test splits.

Parameters:
  • data_path (str | Path) – Path to the dataset file (.parquet, .csv, or .tsv).

  • date_col (str) – Name of the timestamp column.

  • train_end (str | None) – Inclusive end date for the training split (YYYY-MM-DD). If None, 80% of data is used for training.

  • test_start (str | None) – Inclusive start date for the test split (YYYY-MM-DD). If None, the remaining 20% is used.

Return type:

tuple[DataFrame, DataFrame]

Returns:

Tuple of (train_df, test_df) DataFrames.

Raises:
  • FileNotFoundError – If the data file does not exist.

  • ValueError – If the file format is unsupported or the date column is missing.

fit_forecaster#

twiga.pipeline.tasks.fit_forecaster(forecaster, train_df, val_df=None, tune=False, num_trials=20)#

Fit (or tune + fit) the Twiga forecaster.

Parameters:
  • forecaster (TwigaForecaster) – Configured but unfitted forecaster instance.

  • train_df (DataFrame) – Training DataFrame.

  • val_df (DataFrame | None) – Optional validation DataFrame for early stopping.

  • tune (bool) – If True, run Optuna hyperparameter tuning before the final fit.

  • num_trials (int) – Number of Optuna trials (used only when tune=True).

Return type:

TwigaForecaster

Returns:

The fitted TwigaForecaster.

evaluate_model#

twiga.pipeline.tasks.evaluate_model(forecaster, test_df, ensemble_strategy=None)#

Evaluate a fitted forecaster on test data.

Parameters:
  • forecaster (TwigaForecaster) – Fitted TwigaForecaster.

  • test_df (DataFrame) – Test DataFrame.

  • ensemble_strategy (str | None) – Optional ensemble aggregation strategy.

Return type:

tuple[DataFrame, DataFrame]

Returns:

Tuple of (predictions_df, metrics_df) as returned by the forecaster.

save_checkpoint#

twiga.pipeline.tasks.save_checkpoint(forecaster)#

Save the fitted forecaster checkpoint to disk.

Parameters:

forecaster (TwigaForecaster) – Fitted TwigaForecaster.

Return type:

str

Returns:

Absolute path to the checkpoint directory.

Raises:

RuntimeError – If the checkpoint path is not configured.

run_drift_check#

twiga.pipeline.tasks.run_drift_check(reference_df, current_df, report_dir='reports', feature_cols=None)#

Run an Evidently data drift check and publish a Prefect artifact.

Parameters:
  • reference_df (DataFrame) – Training reference DataFrame.

  • current_df (DataFrame) – Current production data window.

  • report_dir (str | Path) – Directory for saving the HTML report.

  • feature_cols (list[str] | None) – Subset of features to include. Defaults to all shared numeric columns.

Return type:

dict[str, Any]

Returns:

Drift summary dict with keys – drift_detected, dataset_drift_score, n_drifted, total, feature_drift, report_path, timestamp.

log_to_mlflow#

twiga.pipeline.tasks.log_to_mlflow(forecaster, metrics_df, predictions_df=None, experiment='twiga', run_name=None, tags=None)#

Log a training/retraining run to MLflow using TwigaTracker.

Parameters:
  • forecaster (TwigaForecaster) – Fitted TwigaForecaster.

  • metrics_df (DataFrame) – Metrics DataFrame from evaluate_model.

  • predictions_df (DataFrame | None) – Optional predictions DataFrame for interactive tables.

  • experiment (str) – MLflow experiment name.

  • run_name (str | None) – Optional human‑readable run label.

  • tags (dict[str, str] | None) – Optional MLflow tags dict.

Return type:

str

Returns:

The MLflow run ID.


Scheduling with Prefect deployments#

Deploy training_flow as a Prefect deployment to run on a schedule:

from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from twiga.pipeline import training_flow

deployment = Deployment.build_from_flow(
    flow=training_flow,
    name="weekly-retrain",
    schedule=CronSchedule(cron="0 2 * * 1"),  # Every Monday at 02:00
    parameters={
        "data_path": "data/load.parquet",
        "experiment": "load-forecast",
    },
)
deployment.apply()
prefect agent start --work-queue default

End-to-end pipeline#

Combine training_flow → serve → retraining_flow into a complete production cycle:

        sequenceDiagram
    participant OPS as Scheduler
    participant PF  as Prefect
    participant API as FastAPI

    Note over OPS,PF: Initial training
    OPS->>PF: trigger training_flow
    PF-->>PF: TrainingResult.checkpoint_path saved

    Note over API: API starts, ModelLoader reads manifest

    Note over OPS,PF: Weekly retraining
    OPS->>PF: trigger retraining_flow
    PF->>PF: DriftSummary → drift check + retrain if needed
    PF-->>API: POST /reload (if RetrainingResult.promoted)
    

See Architecture for the full system diagram.