9.3 Training Pipeline Steps

As mentioned previously, the machine learning pipeline for this particular use case comprises three primary stages: preprocessing, training, and serving. Furthermore, only the model that achieves the highest accuracy is chosen for deployment, which introduces an additional step for model comparison. Each of these steps will be further explained in the upcoming sections.

9.3.1 Data Preprocessing

The data processing stage involves three primary processes. First, the raw data is loaded from an S3 Bucket. Second, the data is preprocessed and converted into the required format. Finally, the preprocessed data is stored in a way that allows it to be utilized by subsequent models. The data processing functionality is implemented within the given data_preprocessing function. The utils module, imported at the beginning, provides the functionality to access, load, and store data from S3. The data is normalized and transformed into a NumPy array to make it compatible with TensorFlow Keras models. The function returns the names and paths of the preprocessed and uploaded data, making it convenient for selecting them for future model training. Moreover, the data preprocessing stage establishes a connection with MLflow to record the sizes of the datasets.

Importing Required Libraries

The following code imports the necessary libraries and modules required for the code execution. It includes libraries for handling file operations, data manipulation, machine learning, progress tracking, as well as custom modules.

# Imports necessary packages
import os
from datetime import datetime
from typing import Tuple

import mlflow
import numpy as np
from keras.utils.np_utils import to_categorical
from sklearn.utils import shuffle
from src.utils import AWSSession, timeit
from tqdm import tqdm

# Import custom modules
from src.utils import AWSSession

Data Preprocessing Function Definition

At first, the data_preprocessing function is defined, which performs the data preprocessing steps. The function takes three arguments: mlflow_experiment_id (the MLflow experiment ID for logging), aws_bucket (the S3 bucket for reading raw data and storing preprocessed data), and path_preprocessed (the subdirectory for storing preprocessed data, with a default value of “preprocessed”). The function returns a tuple of four strings representing the paths of the preprocessed data.

@timeit
def data_preprocessing(
    mlflow_experiment_id: str,
    aws_bucket: str,
    path_preprocessed: str = "preprocessed",
) -> Tuple[str, str, str, str]:
    """Preprocesses data for further use within model training. Raw data is read from given S3 Bucket, normalized, and stored ad a NumPy Array within S3 again. Output directory is on "/preprocessed". The shape of the data set is logged to MLflow.

    Args:
        mlflow_experiment_id (str): Experiment ID of the MLflow run to log data
        aws_bucket (str): S3 Bucket to read raw data from and write preprocessed data
        path_preprocessed (str, optional): Subdirectory to store the preprocessed data on the provided S3 Bucket. Defaults to "preprocessed".

    Returns:
        Tuple[str, str, str, str]: Four strings denoting the path of the preprocessed data stored as NumPy Arrays: X_train_data_path, y_train_data_path, X_test_data_path, y_test_data_path
    """

Setting MLflow Tracking URI and AWS Session

Afterward, the MLflow tracking URI is set and an AWS session created using the AWS Access Key obtained from the environment variables and using the custom class AWSSession().

    mlflow_tracking_uri = os.getenv("MLFLOW_TRACKING_URI")
    mlflow.set_tracking_uri(mlflow_tracking_uri)

    # Instantiate aws session based on AWS Access Key
    # AWS Access Key is fetched within AWS Session by os.getenv
    aws_session = AWSSession()
    aws_session.set_sessions()

Setting Paths and Helper Functions

The paths for storing raw and preprocessed data within the S3 bucket are defined in a next step. As well as the helper functions _load_and_convert_images, _create_label and _merge_data. The _load_and_convert_images function loads and converts images from an S3 bucket folder into a NumPy array. The _create_label function creates a label array for a given dataset, while the _merge_data function merges two datasets into a single dataset.

    # Set paths within s3
    path_raw_data = f"s3://{aws_bucket}/data/"

    folder_benign_train = f"{path_raw_data}train/benign"
    folder_malignant_train = f"{path_raw_data}train/malignant"

    folder_benign_test = f"{path_raw_data}test/benign"
    folder_malignant_test = f"{path_raw_data}test/malignant"

    # Inner helper functions to load the data to a NumPy Array, create labels, and merge Array
    @timeit
    def _load_and_convert_images(folder_path: str) -> np.array:
        ims = [
            aws_session.read_image_from_s3(s3_bucket=aws_bucket, imname=filename)
            for filename in tqdm(aws_session.list_files_in_bucket(folder_path))
        ]
        return np.array(ims, dtype="uint8")

    def _create_label(x_dataset: np.array) -> np.array:
        return np.zeros(x_dataset.shape[0])

    def _merge_data(set_one: np.array, set_two: np.array) -> np.array:
        return np.concatenate((set_one, set_two), axis=0)

Preprocessing Steps and MLflow Logging

This section performs the main preprocessing steps. It loads images from the S3 bucket, creates labels, merges data, shuffles the data, performs data normalization, and uploads the preprocessed data as NumPy arrays to the S3 bucket. The MLflow logging is also performed, recording the sizes of the training and testing data.

    # Start a MLflow run to log the size of the data
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    with mlflow.start_run(experiment_id=mlflow_experiment_id, run_name=f"{timestamp}_Preprocessing") as run:
        print("\n> Loading images from S3...")
        # Load in training pictures
        X_benign = _load_and_convert_images(folder_benign_train)
        X_malignant = _load_and_convert_images(folder_malignant_train)

        # Load in testing pictures
        X_benign_test = _load_and_convert_images(folder_benign_test)
        X_malignant_test = _load_and_convert_images(folder_malignant_test)

        # Log train-test size in MLflow
        print("\n> Log data parameters")
        mlflow.log_param("train_size_benign", X_benign.shape[0])
        mlflow.log_param("train_size_malignant", X_malignant.shape[0])
        mlflow.log_param("test_size_benign", X_benign_test.shape[0])
        mlflow.log_param("test_size_malignant", X_malignant_test.shape[0])

        print("\n> Preprocessing...")
        # Create labels
        y_benign = _create_label(X_benign)
        y_malignant = _create_label(X_malignant)

        y_benign_test = _create_label(X_benign_test)
        y_malignant_test = _create_label(X_malignant_test)

        # Merge data
        y_train = _merge_data(y_benign, y_malignant)
        y_test = _merge_data(y_benign_test, y_malignant_test)

        X_train = _merge_data(X_benign, X_malignant)
        X_test = _merge_data(X_benign_test, X_malignant_test)

        # Shuffle data
        X_train, y_train = shuffle(X_train, y_train)
        X_test, y_test = shuffle(X_test, y_test)

        y_train = to_categorical(y_train, num_classes=2)
        y_test = to_categorical(y_test, num_classes=2)

        # With data augmentation to prevent overfitting
        X_train = X_train / 255.0
        X_test = X_test / 255.0

Uploading preprocessed data

The four preprocessed numpy arrays (X_train, y_train, X_test, y_test) are uploaded to an S3 bucket. The arrays are stored as pickle files with specific file keys in the bucket. Finally, the paths of the preprocessed data are create and and returned as a tuple of strings.

    print("\n> Upload numpy arrays to S3...")
    aws_session.upload_npy_to_s3(
        data=X_train,
        s3_bucket=aws_bucket,
        file_key=f"{path_preprocessed}/X_train.pkl",
    )
    aws_session.upload_npy_to_s3(
        data=y_train,
        s3_bucket=aws_bucket,
        file_key=f"{path_preprocessed}/y_train.pkl",
    )
    aws_session.upload_npy_to_s3(
        data=X_test,
        s3_bucket=aws_bucket,
        file_key=f"{path_preprocessed}/X_test.pkl",
    )
    aws_session.upload_npy_to_s3(
        data=y_test,
        s3_bucket=aws_bucket,
        file_key=f"{path_preprocessed}/y_test.pkl",
    )

    X_train_data_path = f"{path_preprocessed}/X_train.pkl"
    y_train_data_path = f"{path_preprocessed}/y_train.pkl"
    X_test_data_path = f"{path_preprocessed}/X_test.pkl"
    y_test_data_path = f"{path_preprocessed}/y_test.pkl"
    
    # Return directory paths of the data stored in S3
    return X_train_data_path, y_train_data_path, X_test_data_path, y_test_data_path

9.3.2 Model Training

The training step is designed to accommodate different models based on the selected model. The custom model.utils package, imported at the beginning, enables the selection and retrieval of models. The chosen model can be specified by passing its name to the get_model function, which then returns the corresponding model. These models are implemented using TensorFlow Keras and their code is stored in the /model directory. The model is trained using the model_params parameters provided to the training function, which include all the necessary hyperparameters. The training and evaluation are conducted using the preprocessed data from the previous step, which is downloaded from S3 at the beginning. Depending on the selected model, a KFold cross-validation is performed to improve the model’s fit.

MLflow is utilized to track the model’s progress. By invoking mlflow.start_run(), a new MLflow run is initiated. The model_params are logged using mlflow.log_params, and MLflow autolog is enabled for Keras models through mlflow.keras.autolog(). After successful training, the models are stored in the model registry. The trained model is logged using mlflow.keras.register_model, with the specified model_name as the destination.

The function returns the MLflow run ID and crucial information about the model, such as its name, version, and stage.

Importing Dependencies

This section imports the necessary dependencies for the code, including libraries for machine learning, data manipulation, and utility functions.

# Imports necessary packages
import json
import os
from datetime import datetime
from enum import Enum
from typing import Tuple

import mlflow
import mlflow.keras
import numpy as np
from keras import backend as K
from keras.callbacks import ReduceLROnPlateau
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold

# Import custom modules
from src.model.utils import Model_Class, get_model
from src.utils import AWSSession

Defining the train_model Function

The actual code starts by defining the train_model function, which takes several parameters for training a machine learning model, logging the results to MLflow, and returning relevant information. The MLflow tracking URI is retrieved from the environment variable and sets it as the tracking URI for MLflow.

def train_model(
    mlflow_experiment_id: str,
    model_class: Enum,
    model_params: dict,
    aws_bucket: str,
    import_dict: dict = {},
) -> Tuple[str, str, int, str]:
    """
    Trains a machine learning model and logs the results to MLflow.

    Args:
        mlflow_experiment_id (str): The ID of the MLflow experiment to log the results.
        model_class (Enum): The class of the model to train.
        model_params (dict): A dictionary containing the parameters for the model.
        aws_bucket (str): The AWS S3 bucket name for data storage.
        import_dict (dict, optional): A dictionary containing paths for importing data. Defaults to {}.

    Returns:
        Tuple[str, str, int, str]: A tuple containing the run ID, model name, model version, and current stage.

    Raises:
        None
    """
    mlflow_tracking_uri = os.getenv("MLFLOW_TRACKING_URI")
    mlflow.set_tracking_uri(mlflow_tracking_uri)

Loading Data

This section handles the loading of data required for training the model. It retrieves the file paths for the training and testing data from the import_dict parameter and loads the corresponding NumPy arrays from an AWS S3 bucket using the AWSSession class.

    print("\n> Loading data...")
    X_train_data_path = import_dict.get("X_train_data_path")
    y_train_data_path = import_dict.get("y_train_data_path")
    X_test_data_path = import_dict.get("X_test_data_path")
    y_test_data_path = import_dict.get("y_test_data_path")

    # Instantiate aws session based on AWS Access Key
    # AWS Access Key is fetched within AWS Session by os.getenv
    aws_session = AWSSession()
    aws_session.set_sessions()

    # Read NumPy Arrays from S3
    X_train = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=X_train_data_path)
    y_train = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=y_train_data_path)
    X_test = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=X_test_data_path)
    y_test = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=y_test_data_path)

Training the Model

After the data is loaded, the training process for the machine learning model is started. It begins by printing the model class and generating a timestamp for the run name. Then, it starts an MLflow run with the specified experiment ID and run name. The model parameters are logged using MLflow’s log_params function. Additionally, a callback for reducing the learning rate during training is configured using the ReduceLROnPlateau class from Keras.

The model training handles two different scenarios based on the selected model_class. If it is set to cross-validation (Model_Class.CrossVal), the model is trained using cross-validation. Otherwise, it is trained using the specified model class.

    print("\n> Training model...")
    print(model_class)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    with mlflow.start_run(experiment_id=mlflow_experiment_id, run_name=f"{timestamp}-{model_class}") as run:
        mlflow.log_params(model_params)
        learning_rate_reduction = ReduceLROnPlateau(monitor="accuracy", patience=5, verbose=1, factor=0.5, min_lr=1e-7)

        # If CrossVal is selected, train BasicNet as Cross-Validated Model
        if model_class == Model_Class.CrossVal.value:
            kfold = KFold(n_splits=3, shuffle=True, random_state=11)
            cvscores = []
            for train, test in kfold.split(X_train, y_train):
                model = get_model(Model_Class.Basic.value, model_params)

                # Train Model
                model.fit(
                    X_train[train],
                    y_train[train],
                    epochs=model_params.get("epochs"),
                    batch_size=model_params.get("batch_size"),
                    verbose=model_params.get("verbose"),
                )
                scores = model.evaluate(X_train[test], y_train[test], verbose=0)
                print("%s: %.2f%%" % (model.metrics_names[1], scores[1] * 100))
                cvscores.append(scores[1] * 100)
                K.clear_session()
        else:
            model = get_model(model_class, model_params)
            mlflow.keras.autolog()
            
            # Train Model
            model.fit(
                X_train,
                y_train,
                validation_split=model_params.get("validation_split"),
                epochs=model_params.get("epochs"),
                batch_size=model_params.get("batch_size"),
                verbose=model_params.get("verbose"),
                callbacks=[learning_rate_reduction],
            )
            mlflow.keras.autolog(disable=True)

Testing and Evaluating the Model

After the model training, the trained model is tested on the test data and its prediction accuracy evaluated. The accuracy score is calculated using the accuracy_score function from scikit-learn and logged as a metric using MLflow. Afterward, the trained and evaluated model is registered with MLflow using the register_model function. The resulting model name, version, and stage are obtained to finally return them in the functions return statement.

        run_id = run.info.run_id
        model_uri = f"runs:/{run_id}/{model_class}"

        # Testing model on test data to evaluate
        print("\n> Testing model...")
        y_pred = model.predict(X_test)
        prediction_accuracy = accuracy_score(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1))
        mlflow.log_metric("prediction_accuracy", prediction_accuracy)
        print(f"Prediction Accuracy: {prediction_accuracy}")

        print("\n> Register model...")
        mv = mlflow.register_model(model_uri, model_class)

    # Return run ID, model name, model version, and current stage of the model
    return run_id, mv.name, mv.version, mv.current_stage

9.3.3 Data Preprocessing

The data processing stage involves three primary processes. First, the raw data is loaded from an S3 Bucket. Second, the data is preprocessed and converted into the required format. Finally, the preprocessed data is stored in a way that allows it to be utilized by subsequent models. The data processing functionality is implemented within the given data_preprocessing function. The utils module, imported at the beginning, provides the functionality to access, load, and store data from S3. The data is normalized and transformed into a NumPy array to make it compatible with TensorFlow Keras models. The function returns the names and paths of the preprocessed and uploaded data, making it convenient for selecting them for future model training. Moreover, the data preprocessing stage establishes a connection with MLflow to record the sizes of the datasets.

At the beginning of each pipeline step are the import statements for various Python modules and libraries used in the code. They will not be mentioned within the description, but you can look them up in the provided code.

The code starts with the function definition data_preprocessing. The function takes several input parameters and returns a tuple with paths to the preprocessed data stored as NumPy arrays. The function is equipped with a @timeit decorator, which serves to measure the execution time of the preprocessing operation.

@timeit
def data_preprocessing(
    mlflow_experiment_id: str,
    aws_bucket: str,
    path_preprocessed: str = "preprocessed",
) -> Tuple[str, str, str, str]:
    """Preprocesses data for further use within model training. Raw data is read from given S3 Bucket, normalized, and stored ad a NumPy Array within S3 again. Output directory is on "/preprocessed". The shape of the data set is logged to MLflow.

    Args:
        mlflow_experiment_id (str): Experiment ID of the MLflow run to log data
        aws_bucket (str): S3 Bucket to read raw data from and write preprocessed data
        path_preprocessed (str, optional): Subdirectory to store the preprocessed data on the provided S3 Bucket. Defaults to "preprocessed".

    Returns:
        Tuple[str, str, str, str]: Four strings denoting the path of the preprocessed data stored as NumPy Arrays: X_train_data_path, y_train_data_path, X_test_data_path, y_test_data_path
    """

It continues by retrieving the MLflow tracking URI from an environment variable and setting it as the tracking URI for MLflow. An AWS session is set up based on AWS Access Key and AWS Secret Access Key, which is similarly provided using environment variables. The AWSSession Object is a custom class to handle the interaction with S3 buckets. Afterward. the paths within an S3 bucket for different data folders are defined, including training and testing data for benign and malignant cases.

mlflow_tracking_uri = os.getenv("MLFLOW_TRACKING_URI")
mlflow.set_tracking_uri(mlflow_tracking_uri)

# Instantiate aws session based on AWS Access Key
# AWS Access Key is fetched within AWS Session by os.getenv
aws_session = AWSSession()
aws_session.set_sessions()

# Set paths within s3
path_raw_data = f"s3://{aws_bucket}/data/"

folder_benign_train = f"{path_raw_data}train/benign"
folder_malignant_train = f"{path_raw_data}train/malignant"

folder_benign_test = f"{path_raw_data}test/benign"
folder_malignant_test = f"{path_raw_data}test/malignant"

The code proceeds by defining three hidden functions that play a crucial role in the preprocessing of the data:

Image Loading and Conversion: A function named _load_and_convert_images designed to handle the loading and conversion of images retrieved from an S3 bucket folder into a NumPy array. It utilizes the aws_session to access and process these images.

Creating Labels: The following function, _create_label, serves the purpose of generating a label array suitable for a given dataset. This function is responsible for preparing labels essential for classification tasks. The inner function, create_label, undertakes the task of constructing a label array that corresponds to a specific dataset, thus facilitating classification operations.

Merging Data: Lastly, the function _merge_data is introduced to combine two datasets into a unified dataset. This inner function, _merge_data_, plays the role of merging two distinct datasets into a single, consolidated dataset. It effectively combines data originating from diverse sources, allowing for a comprehensive dataset.

    @timeit
    def _load_and_convert_images(folder_path: str) -> np.array:
        """
        Loads and converts images from an S3 bucket folder into a NumPy array.

        Args:
            folder_path (str): The path to the S3 bucket folder.

        Returns:
            np.array: The NumPy array containing the converted images.

        Raises:
            None
        """
        ims = [
            aws_session.read_image_from_s3(s3_bucket=aws_bucket, imname=filename)
            for filename in tqdm(aws_session.list_files_in_bucket(folder_path))
        ]
        return np.array(ims, dtype="uint8")

    def _create_label(x_dataset: np.array) -> np.array:
        """
        Creates label array for the given dataset.

        Args:
            x_dataset (np.array): The dataset for which labels are to be created.

        Returns:
            np.array: The label array.

        Raises:
            None
        """
        return np.zeros(x_dataset.shape[0])

    def _merge_data(set_one: np.array, set_two: np.array) -> np.array:
        """
        Merges two datasets into a single dataset.

        Args:
            set_one (np.array): The first dataset.
            set_two (np.array): The second dataset.

        Returns:
            np.array: The merged dataset.

        Raises:
            None
        """
        return np.concatenate((set_one, set_two), axis=0)

The code continues further with the actual data preprocessing steps, including calling the previously stated hidden functions and loading images, creating labels, merging data, and performing data normalization and augmentation. An MLflow run is created for this step to log parameters such as the training and testing size of the data.

    # Start a MLflow run to log the size of the data
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    with mlflow.start_run(experiment_id=mlflow_experiment_id, run_name=f"{timestamp}_Preprocessing") as run:
        print("\n> Loading images from S3...")
        # Load in training pictures
        X_benign = _load_and_convert_images(folder_benign_train)
        X_malignant = _load_and_convert_images(folder_malignant_train)

        # Load in testing pictures
        X_benign_test = _load_and_convert_images(folder_benign_test)
        X_malignant_test = _load_and_convert_images(folder_malignant_test)

        # Log train-test size in MLflow
        print("\n> Log data parameters")
        mlflow.log_param("train_size_benign", X_benign.shape[0])
        mlflow.log_param("train_size_malignant", X_malignant.shape[0])
        mlflow.log_param("test_size_benign", X_benign_test.shape[0])
        mlflow.log_param("test_size_malignant", X_malignant_test.shape[0])

        print("\n> Preprocessing...")
        # Create labels
        y_benign = _create_label(X_benign)
        y_malignant = _create_label(X_malignant)

        y_benign_test = _create_label(X_benign_test)
        y_malignant_test = _create_label(X_malignant_test)

        # Merge data
        y_train = _merge_data(y_benign, y_malignant)
        y_test = _merge_data(y_benign_test, y_malignant_test)

        X_train = _merge_data(X_benign, X_malignant)
        X_test = _merge_data(X_benign_test, X_malignant_test)

        # Shuffle data
        X_train, y_train = shuffle(X_train, y_train)
        X_test, y_test = shuffle(X_test, y_test)

        y_train = to_categorical(y_train, num_classes=2)
        y_test = to_categorical(y_test, num_classes=2)

        # With data augmentation to prevent overfitting
        X_train = X_train / 255.0
        X_test = X_test / 255.0

Once the data has been normalized and converted into a NumPy array, it is then transferred to an S3 bucket with the assistance of the aws_session. The function returns the names and paths of the preprocessed data that has been uploaded. This information is made available for use in subsequent functions.

        print("\n> Upload numpy arrays to S3...")
        aws_session.upload_npy_to_s3(
            data=X_train,
            s3_bucket=aws_bucket,
            file_key=f"{path_preprocessed}/X_train.pkl",
        )
        aws_session.upload_npy_to_s3(
            data=y_train,
            s3_bucket=aws_bucket,
            file_key=f"{path_preprocessed}/y_train.pkl",
        )
        aws_session.upload_npy_to_s3(
            data=X_test,
            s3_bucket=aws_bucket,
            file_key=f"{path_preprocessed}/X_test.pkl",
        )
        aws_session.upload_npy_to_s3(
            data=y_test,
            s3_bucket=aws_bucket,
            file_key=f"{path_preprocessed}/y_test.pkl",
        )

    X_train_data_path = f"{path_preprocessed}/X_train.pkl"
    y_train_data_path = f"{path_preprocessed}/y_train.pkl"
    X_test_data_path = f"{path_preprocessed}/X_test.pkl"
    y_test_data_path = f"{path_preprocessed}/y_test.pkl"

    return X_train_data_path, y_train_data_path, X_test_data_path, y_test_data_path

9.3.4 Model Training

The training step is designed to accommodate different models based on the selected model. The custom model.utils package, imported at the beginning, enables the selection and retrieval of models. The chosen model can be specified by passing its name to the get_model function, which then returns the corresponding model. These models are implemented using TensorFlow Keras and their code is stored in the /model directory. The model is trained using the model_params parameters provided to the training function, which include all the necessary hyperparameters. The training and evaluation are conducted using the preprocessed data from the previous step, which is downloaded from S3 at the beginning. Depending on the selected model, a KFold cross-validation is performed to improve the model’s fit.

MLflow is utilized to track the model’s progress. By invoking mlflow.start_run(), a new MLflow run is initiated. The model_params are logged using mlflow.log_params, and MLflow autolog is enabled for Keras models through mlflow.keras.autolog(). After successful training, the models are stored in the model registry. The trained model is logged using mlflow.keras.register_model, with the specified model_name as the destination.

The Function Definition train_model takes several input parameters, including mlflow_experiment_id, model_class, model_params, aws_bucket, and an optional import_dict for importing data. It returns a tuple containing information about the run and model. The code of the functions starts by setting the tracking URI for MLflow.

def train_model(
    mlflow_experiment_id: str,
    model_class: Enum,
    model_params: dict,
    aws_bucket: str,
    import_dict: dict = {},
) -> Tuple[str, str, int, str]:
    """
    Trains a machine learning model and logs the results to MLflow.

    Args:
        mlflow_experiment_id (str): The ID of the MLflow experiment to log the results.
        model_class (Enum): The class of the model to train.
        model_params (dict): A dictionary containing the parameters for the model.
        aws_bucket (str): The AWS S3 bucket name for data storage.
        import_dict (dict, optional): A dictionary containing paths for importing data. Defaults to {}.

    Returns:
        Tuple[str, str, int, str]: A tuple containing the run ID, model name, model version, and current stage.

    Raises:
        None
    """
    mlflow_tracking_uri = os.getenv("MLFLOW_TRACKING_URI")
    mlflow.set_tracking_uri(mlflow_tracking_uri)

Afterward, the data required for training and testing the model is loaded from AWS S3 buckets. It fetches file paths from the import_dict dictionary, and instantiates an AWSSession. It uses the AWSSession class to download NumPy arrays from the specified S3 bucket.


    print("\n> Loading data...")
    X_train_data_path = import_dict.get("X_train_data_path")
    y_train_data_path = import_dict.get("y_train_data_path")
    X_test_data_path = import_dict.get("X_test_data_path")
    y_test_data_path = import_dict.get("y_test_data_path")

    # Instantiate aws session based on AWS Access Key
    # AWS Access Key is fetched within AWS Session by os.getenv
    aws_session = AWSSession()
    aws_session.set_sessions()

    # Read NumPy Arrays from S3
    X_train = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=X_train_data_path)
    y_train = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=y_train_data_path)
    X_test = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=X_test_data_path)
    y_test = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=y_test_data_path)

The training of the machine learning model is contingent upon the chosen model class. If the model_class is set to Model_Class.CrossVal, the training process involves k-fold cross-validation using the BasicNet as the model. Conversely, if the model_class is anything other than Model_Class.CrossVal, the model undergoes training without cross-validation, following the specifications associated with the provided class and parameters.

The selection of the model is based on the model_class Enum, which could be either Model_Class.CrossVal or Model_Class.ResNet50. During the training process, the mlflow.autolog functionality is employed, allowing for the automatic logging of all essential run parameters. This feature can be enabled or disabled as needed to facilitate model training and parameter logging.

print("\n> Training model...")
print(model_class)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with mlflow.start_run(experiment_id=mlflow_experiment_id, run_name=f"{timestamp}-{model_class}") as run:
    mlflow.log_params(model_params)
    learning_rate_reduction = ReduceLROnPlateau(monitor="accuracy", patience=5, verbose=1, factor=0.5, min_lr=1e-7)

    # If CrossVal is selected, train BasicNet as Cross-Validated Model
    if model_class == Model_Class.CrossVal.value:
        kfold = KFold(n_splits=3, shuffle=True, random_state=11)
        cvscores = []
        for train, test in kfold.split(X_train, y_train):
            model = get_model(Model_Class.Basic.value, model_params)
            # Train Model
            model.fit(
                X_train[train],
                y_train[train],
                epochs=model_params.get("epochs"),
                batch_size=model_params.get("batch_size"),
                verbose=model_params.get("verbose"),
            )
            scores = model.evaluate(X_train[test], y_train[test], verbose=0)
            print("%s: %.2f%%" % (model.metrics_names[1], scores[1] * 100))
            cvscores.append(scores[1] * 100)
            K.clear_session()
    # TODO: not very safe, create if-else on other Enums
    else:
        model = get_model(model_class, model_params)
        mlflow.keras.autolog()
        # Train Model
        model.fit(
            X_train,
            y_train,
            validation_split=model_params.get("validation_split"),
            epochs=model_params.get("epochs"),
            batch_size=model_params.get("batch_size"),
            verbose=model_params.get("verbose"),
            callbacks=[learning_rate_reduction],
        )
        mlflow.keras.autolog(disable=True)

    run_id = run.info.run_id
    model_uri = f"runs:/{run_id}/{model_class}"

After model training, the trained model is tested on a separate test dataset and log the prediction accuracy as a metric in MLflow. The model is then registered with MLflow and necessary metadata about the registered model are stored in the variable mv.

# Testing model on test data to evaluate
print("\n> Testing model...")
y_pred = model.predict(X_test)
prediction_accuracy = accuracy_score(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1))
mlflow.log_metric("prediction_accuracy", prediction_accuracy)
print(f"Prediction Accuracy: {prediction_accuracy}")
print("\n> Register model...")
mv = mlflow.register_model(model_uri, model_class)

Finally, the function returns a tuple containing the run ID and crucial information about the model, such as its name, version, and stage.

return run_id, mv.name, mv.version, mv.current_stage

9.3.5 Model Comparison

The compare_models function is responsible for model comparison within MLflow. It evaluates MLflow models by considering a specified metric and promotes the best-performing model to the "Staging" stage within the MLflow Registry. This function accepts two arguments: input_dict, which is a dictionary containing model names and run IDs, and an optional metric parameter (defaulting to "prediction_accuracy") used for conducting the comparison.

def compare_models(input_dict: dict, metric: str = "prediction_accuracy") -> Tuple[str, str, int]:
    """
    Compares a given set of MLflow models based on their logged metric. The model with the best metric will be
    transferred to a "Staging" stage within the MLflow Registry.

    Args:
        input_dict (dict): A dictionary containing the names and run IDs of the MLflow models to compare.
        metric (str, optional): The metric to compare the models. Defaults to "prediction_accuracy".

    Returns:
        Tuple[str, str, int]: A tuple containing the name of the best performing model, its MLflow URI,
                              and the version of the model.

    Raises:
        None
    """
    mlflow_tracking_uri = os.getenv("MLFLOW_TRACKING_URI")
    mlflow.set_tracking_uri(mlflow_tracking_uri)

Once the MLflow tracking URI is configured to align with the “MLFLOW_TRACKING_URI” environment variable, an MLflow client is established. This client is integral to a process known as the Model Comparison Loop. In this loop, models listed in the input_dict dictionary are individually processed. The loop retrieves their respective metrics and aggregates this data within the all_results dictionary, facilitating direct model comparisons.

    client = mlflow.MlflowClient(tracking_uri=mlflow_tracking_uri)

    all_results = {}
    for key, value in input_dict.items():
        # extract params/metrics data for run `test_run_id` in a single dict
        model_results_data_dict = client.get_run(value).data.to_dictionary()
        # get params and metrics for this run (test_run_id)
        model_results_accuracy = model_results_data_dict["metrics"][metric]
        all_results[key] = model_results_accuracy

    # Get model with maximum accuracy
    serving_model_name = max(all_results, key=all_results.get)
    serving_model_version = client.get_latest_versions(name=serving_model_name, stages=["None"])[0].version
    print(f"acc_dict: {all_results}")
    print(f"acc_dict_model: {serving_model_name}")
    print(f"latest_model_version: {serving_model_version}")

Subsequently, the model with the highest accuracy, as determined from the gathered metrics, is singled out for special attention. This exceptional model is then moved to the "Staging" stage within the MLflow Registry, indicating its preparedness for subsequent evaluation and deployment. As a result, a tuple is returned, containing the name of the top-performing model, its corresponding MLflow URI, and the model’s version information.

    # Transition model to stage "Staging"
    model_stage = "Staging"
    client.transition_model_version_stage(name=serving_model_name, version=serving_model_version, stage=model_stage)
    serving_model_uri = f"models:/{serving_model_name}/{model_stage}"

    return serving_model_name, serving_model_uri, serving_model_version

9.3.6 Model Deployment & Serving

This code serves a specific purpose: to simplify the deployment of MLflow models to AWS SageMaker. It achieves this by meticulously configuring and overseeing the deployment process through a combination of environment variables and utility functions. By making use of MLflow’s SageMaker integration, this code automates the deployment of a pre-trained TensorFlow model from the MLflow registry to a SageMaker instance.

The primary responsibility for deploying an MLflow model to an AWS SageMaker endpoint lies with the deploy_model_to_sagemaker function. To kickstart the deployment process, various AWS and MLflow-related environment variables, essential for the deployment, are initially fetched.

def deploy_model_to_sagemaker(
    mlflow_model_name: str,
    mlflow_model_uri: str,
    mlflow_experiment_name: str,
    mlflow_model_version: int,
    sagemaker_endpoint_name: str,
    sagemaker_instance_type: str,
) -> bool:
    """
    Deploy a machine learning model to AWS SageMaker from MLflow.

    This function deploys an MLflow model to an AWS SageMaker endpoint using the specified configuration.

    Args:
        mlflow_model_name (str): The name of the MLflow model to deploy.
        mlflow_model_uri (str): The URI of the MLflow model from the registry.
        mlflow_experiment_name (str): The name of the MLflow experiment containing the model.
        mlflow_model_version (int): The version of the MLflow model to deploy.
        sagemaker_endpoint_name (str): The desired name for the SageMaker endpoint.
        sagemaker_instance_type (str): The SageMaker instance type for deployment.

    Returns:
        bool: True if the task was completed successfully, otherwise False.
    """
    # Retrieve AWS and MLflow environment variables
    AWS_ID = os.getenv("AWS_ID")
    AWS_REGION = os.getenv("AWS_REGION")
    AWS_ACCESS_ROLE_NAME_SAGEMAKER = os.getenv("AWS_ROLE_NAME_SAGEMAKER")
    ECR_REPOSITORY_NAME = os.getenv("ECR_REPOSITORY_NAME")
    ECR_SAGEMAKER_IMAGE_TAG = os.getenv("ECR_SAGEMAKER_IMAGE_TAG")
    MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI")

    # Set the MLflow tracking URI
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

Within this main deployment function, there are three utility functions that have been defined:

  • _build_image_url: This function constructs the ECR image URL required for SageMaker.
  • _build_execution_role_arn: It is responsible for generating the SageMaker execution role ARN.
  • _get_mlflow_parameters: This utility function retrieves crucial MLflow model parameters, including the model URI and source code.
    def _build_image_url(
        aws_id: str,
        aws_region: str,
        ecr_repository_name: str,
        ecr_sagemaker_image_tag: str,
    ) -> str:
        """
        Build the ECR image URL for SageMaker.

        Args:
            aws_id (str): AWS account ID.
            aws_region (str): AWS region.
            ecr_repository_name (str): Name of the ECR repository.
            ecr_sagemaker_image_tag (str): Tag for the ECR image.

        Returns:
            str: The ECR image URL for SageMaker.
        """
        image_url = f"{aws_id}.dkr.ecr.{aws_region}.amazonaws.com/{ecr_repository_name}:{ecr_sagemaker_image_tag}"
        return image_url

    def _build_execution_role_arn(aws_id: str, access_role_name: str) -> str:
        """
        Build the SageMaker execution role ARN.

        Args:
            aws_id (str): AWS account ID.
            access_role_name (str): SageMaker execution role name.

        Returns:
            str: The SageMaker execution role ARN.
        """
        execution_role_arn = f"arn:aws:iam::{aws_id}:role/{access_role_name}"
        return execution_role_arn

    def _get_mlflow_parameters(experiment_name: str, model_name: str, model_version: int) -> (str, str, str):
        """
        Retrieve MLflow model parameters.

        Args:
            experiment_name (str): Name of the MLflow experiment.
            model_name (str): Name of the MLflow model.
            model_version (int): Version of the MLflow model.

        Returns:
            Tuple[str, str]: The model URI and source.
        """
        client = mlflow.MlflowClient()
        model_version_details = client.get_model_version(
            name=model_name,
            version=model_version,
        )

        # This is for local
        # experiment_id = dict(mlflow.get_experiment_by_name(experiment_name))["experiment_id"]
        # run_id = model_version_details.run_id
        # model_uri = f"mlruns/{experiment_id}/{run_id}/artifacts/{model_name}"
        model_source = model_version_details.source
        model_source_adapted = f"{model_source.removesuffix(model_name)}model"

        return model_source, model_source_adapted

Following their definition, these utility functions are invoked sequentially. Initially, the ECR image URL for SageMaker and the SageMaker execution role ARN are constructed using _build_image_url and _build_execution_role_arn. Subsequently, _get_mlflow_parameters is called to retrieve pertinent MLflow model parameters, encompassing the model source and any adaptations made to the source code.

    image_url = _build_image_url(
        aws_id=AWS_ID,
        aws_region=AWS_REGION,
        ecr_repository_name=ECR_REPOSITORY_NAME,
        ecr_sagemaker_image_tag=ECR_SAGEMAKER_IMAGE_TAG,
    )
    execution_role_arn = _build_execution_role_arn(aws_id=AWS_ID, access_role_name=AWS_ACCESS_ROLE_NAME_SAGEMAKER)
    model_source, model_source_adapted = _get_mlflow_parameters(
        experiment_name=mlflow_experiment_name,
        model_name=mlflow_model_name,
        model_version=mlflow_model_version,
    )

    print(f"model_source: {model_source}")
    print(f"mlflow_model_uri: {mlflow_model_uri}")
    print(f"model_source_adapted: {model_source_adapted}")

Each of these preceding steps is considered a prerequisite to the actual deployment of the model to SageMaker.

The MLflow model deployment to an AWS SageMaker endpoint is executed via MLflow’s SageMaker integration. This is accomplished by invoking mlflow.sagemaker._deploy with all the previously accumulated information. Given that the setup of the SageMaker instance and environment can be time-consuming, a suitable timeout is established. Ultimately, the function returns True upon successful completion of the deployment task; otherwise, it returns False.

    mlflow.sagemaker._deploy(
        mode="create",
        app_name=sagemaker_endpoint_name,
        model_uri=model_source_adapted,
        image_url=image_url,
        execution_role_arn=execution_role_arn,
        instance_type=sagemaker_instance_type,
        instance_count=1,
        region_name=AWS_REGION,
        timeout_seconds=2400,
    )

    return True