Pipeline Orchestration#
Source files
twiga/pipeline/tasks.py- Prefect taskstwiga/pipeline/flows.py-training_flow,retraining_flowtwiga/pipeline/schemas.py-TrainingResult,RetrainingResult,DriftSummary
Overview#
twiga.pipeline provides two Prefect flows that
orchestrate the complete Twiga training and retraining lifecycle. Each stage
of the pipeline is a separate Prefect task - enabling independent retries,
logging, and observability in the Prefect UI.
Both flows return typed dataclasses so downstream code gets IDE completion and type checking with no dict-key guesswork.
pip install twiga[mlops]
training_flow#
The full train → evaluate → checkpoint → MLflow pipeline in one call:
from twiga.pipeline import training_flow
from twiga.pipeline.schemas import TrainingResult
result: TrainingResult = training_flow(
forecaster=forecaster,
data_path="data/load.parquet",
date_col="timestamp",
train_end="2023-12-31",
test_start="2024-01-01",
tune=True,
num_trials=30,
experiment="load-forecast-v2",
run_name="lgbm-run-001",
)
print(result.checkpoint_path)
print(result.mlflow_run_id)
print(result.metrics)
Steps#
graph LR
A[load_data] --> B[fit_forecaster]
B --> C[evaluate_model]
C --> D[save_checkpoint]
D --> E[log_to_mlflow]
E --> F[publish_artifact]
Step |
Task |
Description |
|---|---|---|
1 |
|
Read parquet/CSV, parse timestamps, split train/test |
2 |
|
Optionally tune with Optuna, then fit on training data |
3 |
|
Run |
4 |
|
Persist model + write |
5 |
|
Log params, metrics, and checkpoint artefacts via |
6 |
Publish artefact |
Markdown metrics table in the Prefect UI |
Return value - TrainingResult#
from twiga.pipeline.schemas import TrainingResult
result.checkpoint_path # "/checkpoints/load-forecast/20240101/"
result.mlflow_run_id # "a1b2c3d4e5f6"
result.metrics # {"lightgbm": {"mae": 4.23, "rmse": 6.11, "corr": 0.97}}
API reference#
- twiga.pipeline.flows.training_flow(forecaster, data_path, date_col='timestamp', train_end=None, test_start=None, tune=False, num_trials=20, ensemble_strategy=None, experiment='twiga', run_name=None, tags=None, report_dir='reports')#
Full training pipeline: load → fit → evaluate → checkpoint → MLflow.
- Parameters:
forecaster (
TwigaForecaster) – Configured (unfitted) forecaster instance.data_path (
str|Path) – Path to the dataset file (.parquetor.csv).date_col (
str) – Name of the timestamp column in the dataset.train_end (
str|None) – Inclusive end date for the training split (YYYY-MM-DD).test_start (
str|None) – Inclusive start date for the test split (YYYY-MM-DD).tune (
bool) – IfTrue, run Optuna HPO before the final fit.num_trials (
int) – Number of Optuna trials (used whentune=True).ensemble_strategy (
str|None) – Ensemble aggregation strategy for evaluation.experiment (
str) – MLflow experiment name.report_dir (
str|Path) – Directory for Evidently HTML reports.
- Return type:
TrainingResult- Returns:
TrainingResultwithcheckpoint_path,mlflow_run_id, andmetrics.
retraining_flow#
Drift-aware retraining that only promotes the new model if it outperforms the incumbent:
from twiga.pipeline import retraining_flow
from twiga.pipeline.schemas import RetrainingResult
result: RetrainingResult = retraining_flow(
forecaster=forecaster,
reference_data_path="data/train.parquet",
current_data_path="data/current_window.parquet",
drift_threshold=0.5, # retrain if > 50 % of features drifted
performance_threshold=0.15, # also retrain if incumbent MAE > 0.15
metric_key="mae",
experiment="load-forecast-retrain",
)
print(result.retrained, result.promoted)
print(result.drift_summary.drift_detected)
print(result.drift_summary.n_drifted)
Decision logic#
graph TD
A[load reference + current data] --> B[run_drift_check\n→ DriftSummary]
B --> C{drift_fraction\n> threshold?}
C -->|No| D[evaluate incumbent]
C -->|Yes| G
D --> E{perf_threshold\nbreached?}
E -->|No| F[Return: retrained=False]
E -->|Yes| G[fit_forecaster on current data]
G --> H[evaluate new model]
H --> I{new_metric\n< incumbent?}
I -->|Yes| J[save_checkpoint\n+ log_to_mlflow]
I -->|No| K[Return: promoted=False]
J --> L[Return: promoted=True\n+ POST /reload]
Incumbent evaluation - error classification#
The flow distinguishes expected first-run conditions from unexpected failures:
try:
forecaster.on_load_checkpoint()
# evaluate incumbent...
except (NotFittedError, ValueError) as exc:
# Expected: no checkpoint yet on first run
prefect_log.info("No incumbent model available (%s) - will retrain.", exc)
except Exception as exc:
# Unexpected: corrupted checkpoint, pipeline error
prefect_log.warning("Unexpected error evaluating incumbent: %s", exc)
Return value - RetrainingResult#
from twiga.pipeline.schemas import RetrainingResult, DriftSummary
result.retrained # True
result.promoted # True
result.checkpoint_path # "/checkpoints/load-forecast/20240601/"
result.mlflow_run_id # "f7e8d9c0b1a2"
result.metrics # {"lightgbm": {"mae": 3.91, "rmse": 5.72}}
ds: DriftSummary = result.drift_summary
ds.drift_detected # True
ds.dataset_drift_score # 0.6
ds.n_drifted # 4
ds.feature_drift # {"temperature": 0.21, "hour": 0.04, ...}
ds.report_path # "/reports/data_drift_20240601T120000Z.html"
API reference#
- twiga.pipeline.flows.retraining_flow(forecaster, reference_data_path, current_data_path, date_col='timestamp', train_end=None, test_start=None, drift_threshold=0.5, performance_threshold=None, metric_key='mae', minimize=True, tune=False, num_trials=20, ensemble_strategy=None, experiment='twiga-retrain', run_name=None, tags=None, report_dir='reports', fastapi_reload_url=None, fastapi_reload_api_key_secret=None)#
Drift-aware retraining flow: check → retrain → promote if better.
- Parameters:
forecaster (
TwigaForecaster) – Configured (unfitted) forecaster. The current checkpoint is loaded for baseline evaluation before retraining.reference_data_path (
str|Path) – Path to the reference (historical) dataset.current_data_path (
str|Path) – Path to the current production data window.date_col (
str) – Timestamp column name.train_end (
str|None) – End date for the training split of current data.test_start (
str|None) – Start date for the test split of current data.drift_threshold (
float) – Fraction of drifted features that triggers retrain.performance_threshold (
float|None) – Optional absolute metric threshold that also triggers retraining.metric_key (
str) – Metric column used for performance comparison.minimize (
bool) – IfTrue, lower metric is better (e.g. MAE, RMSE).tune (
bool) – Run HPO during retraining.num_trials (
int) – HPO trial budget.ensemble_strategy (
str|None) – Ensemble strategy for evaluation.experiment (
str) – MLflow experiment name.report_dir (
str|Path) – Directory for Evidently HTML reports.fastapi_reload_url (
str|None) – Optional webhook URL to hot-reload the API after model promotion (e.g."http://api:8000").fastapi_reload_api_key_secret (
str|None) – Name of a Prefect Secret holding theX-API-Keyvalue for the reload webhook.
- Return type:
RetrainingResult- Returns:
RetrainingResultwith full decision context:retrained,promoted,drift_summary,checkpoint_path,mlflow_run_id, andmetrics.
Pipeline schemas#
- class twiga.pipeline.schemas.TrainingResult(checkpoint_path, mlflow_run_id, metrics=<factory>)
Bases:
objectTyped output of
training_flow().- Variables:
checkpoint_path – Absolute path to the saved checkpoint directory.
mlflow_run_id – MLflow run ID for the training run.
metrics – Aggregated evaluation metrics keyed by model name, then metric name. Example:
{"lgbm": {"mae": 0.12, "rmse": 0.18}}.
- checkpoint_path: str
- mlflow_run_id: str
- class twiga.pipeline.schemas.RetrainingResult(retrained, promoted, drift_summary, checkpoint_path=None, mlflow_run_id=None, metrics=None)
Bases:
objectTyped output of
retraining_flow().- Variables:
retrained – Whether a new model was trained in this run.
promoted – Whether the new model replaced the incumbent.
drift_summary – Structured Evidently drift report.
checkpoint_path – Path to the new checkpoint (
Noneif not promoted).mlflow_run_id – MLflow run ID for the retraining run (
Noneif not retrained or not promoted).metrics – Aggregated evaluation metrics for the new model (
Noneif not promoted).
- drift_summary: DriftSummary
- promoted: bool
- retrained: bool
- class twiga.pipeline.schemas.DriftSummary(drift_detected=False, dataset_drift_score=0.0, n_drifted=0, total=0, feature_drift=<factory>, report_path=None, timestamp='')
Bases:
objectEvidently drift report summary produced by the
run-drift-checktask.- Variables:
drift_detected –
Truewhen at least one feature drifted.dataset_drift_score – Fraction of features that drifted (0..1).
n_drifted – Number of drifted features.
total – Total number of monitored features.
feature_drift – Per-feature drift scores keyed by column name.
report_path – Absolute path to the saved HTML report (or
None).timestamp – ISO-8601 timestamp when the report was generated.
- dataset_drift_score: float = 0.0
- drift_detected: bool = False
- classmethod from_dict(d)
Build from the raw dict returned by
run_drift_check().Unknown keys in d are silently ignored so the schema can evolve without breaking callers that still pass raw dicts.
- Return type:
DriftSummary
- n_drifted: int = 0
- timestamp: str = ''
- total: int = 0
Tasks#
Each task is individually retryable and can be imported and called directly outside of a flow (useful for notebooks and testing):
# Call without a running Prefect server
from twiga.pipeline.tasks import load_data
train_df, test_df = load_data.fn(
data_path="data/load.parquet",
date_col="timestamp",
train_end="2023-12-31",
test_start="2024-01-01",
)
load_data#
- twiga.pipeline.tasks.load_data(data_path, date_col='timestamp', train_end=None, test_start=None)#
Load and split a time series dataset into train/test splits.
- Parameters:
data_path (
str|Path) – Path to the dataset file (.parquet,.csv, or.tsv).date_col (
str) – Name of the timestamp column.train_end (
str|None) – Inclusive end date for the training split (YYYY-MM-DD). If None, 80% of data is used for training.test_start (
str|None) – Inclusive start date for the test split (YYYY-MM-DD). If None, the remaining 20% is used.
- Return type:
- Returns:
Tuple of (train_df, test_df) DataFrames.
- Raises:
FileNotFoundError – If the data file does not exist.
ValueError – If the file format is unsupported or the date column is missing.
fit_forecaster#
- twiga.pipeline.tasks.fit_forecaster(forecaster, train_df, val_df=None, tune=False, num_trials=20)#
Fit (or tune + fit) the Twiga forecaster.
- Parameters:
forecaster (
TwigaForecaster) – Configured but unfitted forecaster instance.train_df (
DataFrame) – Training DataFrame.val_df (
DataFrame|None) – Optional validation DataFrame for early stopping.tune (
bool) – If True, run Optuna hyperparameter tuning before the final fit.num_trials (
int) – Number of Optuna trials (used only when tune=True).
- Return type:
- Returns:
The fitted TwigaForecaster.
evaluate_model#
- twiga.pipeline.tasks.evaluate_model(forecaster, test_df, ensemble_strategy=None)#
Evaluate a fitted forecaster on test data.
save_checkpoint#
- twiga.pipeline.tasks.save_checkpoint(forecaster)#
Save the fitted forecaster checkpoint to disk.
- Parameters:
forecaster (
TwigaForecaster) – Fitted TwigaForecaster.- Return type:
- Returns:
Absolute path to the checkpoint directory.
- Raises:
RuntimeError – If the checkpoint path is not configured.
run_drift_check#
- twiga.pipeline.tasks.run_drift_check(reference_df, current_df, report_dir='reports', feature_cols=None)#
Run an Evidently data drift check and publish a Prefect artifact.
- Parameters:
- Return type:
- Returns:
Drift summary dict with keys – drift_detected, dataset_drift_score, n_drifted, total, feature_drift, report_path, timestamp.
log_to_mlflow#
- twiga.pipeline.tasks.log_to_mlflow(forecaster, metrics_df, predictions_df=None, experiment='twiga', run_name=None, tags=None)#
Log a training/retraining run to MLflow using TwigaTracker.
- Parameters:
forecaster (
TwigaForecaster) – Fitted TwigaForecaster.metrics_df (
DataFrame) – Metrics DataFrame from evaluate_model.predictions_df (
DataFrame|None) – Optional predictions DataFrame for interactive tables.experiment (
str) – MLflow experiment name.
- Return type:
- Returns:
The MLflow run ID.
Scheduling with Prefect deployments#
Deploy training_flow as a Prefect deployment to run on a schedule:
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from twiga.pipeline import training_flow
deployment = Deployment.build_from_flow(
flow=training_flow,
name="weekly-retrain",
schedule=CronSchedule(cron="0 2 * * 1"), # Every Monday at 02:00
parameters={
"data_path": "data/load.parquet",
"experiment": "load-forecast",
},
)
deployment.apply()
prefect agent start --work-queue default
End-to-end pipeline#
Combine training_flow → serve → retraining_flow into a complete
production cycle:
sequenceDiagram
participant OPS as Scheduler
participant PF as Prefect
participant API as FastAPI
Note over OPS,PF: Initial training
OPS->>PF: trigger training_flow
PF-->>PF: TrainingResult.checkpoint_path saved
Note over API: API starts, ModelLoader reads manifest
Note over OPS,PF: Weekly retraining
OPS->>PF: trigger retraining_flow
PF->>PF: DriftSummary → drift check + retrain if needed
PF-->>API: POST /reload (if RetrainingResult.promoted)
See Architecture for the full system diagram.