9.3 Training Pipeline Steps
As mentioned previously, the machine learning pipeline for this particular use case comprises three primary stages: preprocessing, training, and serving. Furthermore, only the model that achieves the highest accuracy is chosen for deployment, which introduces an additional step for model comparison. Each of these steps will be further explained in the upcoming sections.
9.3.1 Data Preprocessing
The data processing stage involves three primary processes. First, the raw data is loaded from an S3 Bucket. Second, the data is preprocessed and converted into the required format. Finally, the preprocessed data is stored in a way that allows it to be utilized by subsequent models. The data processing functionality is implemented within the given data_preprocessing
function. The utils
module, imported at the beginning, provides the functionality to access, load, and store data from S3. The data is normalized and transformed into a NumPy array to make it compatible with TensorFlow Keras models. The function returns the names and paths of the preprocessed and uploaded data, making it convenient for selecting them for future model training. Moreover, the data preprocessing stage establishes a connection with MLflow to record the sizes of the datasets.
Importing Required Libraries
The following code imports the necessary libraries and modules required for the code execution. It includes libraries for handling file operations, data manipulation, machine learning, progress tracking, as well as custom modules.
# Imports necessary packages
import os
from datetime import datetime
from typing import Tuple
import mlflow
import numpy as np
from keras.utils.np_utils import to_categorical
from sklearn.utils import shuffle
from src.utils import AWSSession, timeit
from tqdm import tqdm
# Import custom modules
from src.utils import AWSSession
Data Preprocessing Function Definition
At first, the data_preprocessing
function is defined, which performs the data preprocessing steps. The function takes three arguments: mlflow_experiment_id
(the MLflow experiment ID for logging), aws_bucket (the S3 bucket for reading raw data and storing preprocessed data), and path_preprocessed (the subdirectory for storing preprocessed data, with a default value of “preprocessed”). The function returns a tuple of four strings representing the paths of the preprocessed data.
@timeit
def data_preprocessing(
str,
mlflow_experiment_id: str,
aws_bucket: str = "preprocessed",
path_preprocessed: -> Tuple[str, str, str, str]:
) """Preprocesses data for further use within model training. Raw data is read from given S3 Bucket, normalized, and stored ad a NumPy Array within S3 again. Output directory is on "/preprocessed". The shape of the data set is logged to MLflow.
Args:
mlflow_experiment_id (str): Experiment ID of the MLflow run to log data
aws_bucket (str): S3 Bucket to read raw data from and write preprocessed data
path_preprocessed (str, optional): Subdirectory to store the preprocessed data on the provided S3 Bucket. Defaults to "preprocessed".
Returns:
Tuple[str, str, str, str]: Four strings denoting the path of the preprocessed data stored as NumPy Arrays: X_train_data_path, y_train_data_path, X_test_data_path, y_test_data_path
"""
Setting MLflow Tracking URI and AWS Session
Afterward, the MLflow tracking URI is set and an AWS session created using the AWS Access Key obtained from the environment variables and using the custom class AWSSession()
.
= os.getenv("MLFLOW_TRACKING_URI")
mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)
# Instantiate aws session based on AWS Access Key
# AWS Access Key is fetched within AWS Session by os.getenv
= AWSSession()
aws_session aws_session.set_sessions()
Setting Paths and Helper Functions
The paths for storing raw and preprocessed data within the S3 bucket are defined in a next step. As well as the helper functions _load_and_convert_images
, _create_label
and _merge_data
. The _load_and_convert_images
function loads and converts images from an S3 bucket folder into a NumPy array. The _create_label
function creates a label array for a given dataset, while the _merge_data
function merges two datasets into a single dataset.
# Set paths within s3
= f"s3://{aws_bucket}/data/"
path_raw_data
= f"{path_raw_data}train/benign"
folder_benign_train = f"{path_raw_data}train/malignant"
folder_malignant_train
= f"{path_raw_data}test/benign"
folder_benign_test = f"{path_raw_data}test/malignant"
folder_malignant_test
# Inner helper functions to load the data to a NumPy Array, create labels, and merge Array
@timeit
def _load_and_convert_images(folder_path: str) -> np.array:
= [
ims =aws_bucket, imname=filename)
aws_session.read_image_from_s3(s3_bucketfor filename in tqdm(aws_session.list_files_in_bucket(folder_path))
]return np.array(ims, dtype="uint8")
def _create_label(x_dataset: np.array) -> np.array:
return np.zeros(x_dataset.shape[0])
def _merge_data(set_one: np.array, set_two: np.array) -> np.array:
return np.concatenate((set_one, set_two), axis=0)
Preprocessing Steps and MLflow Logging
This section performs the main preprocessing steps. It loads images from the S3 bucket, creates labels, merges data, shuffles the data, performs data normalization, and uploads the preprocessed data as NumPy arrays to the S3 bucket. The MLflow logging is also performed, recording the sizes of the training and testing data.
# Start a MLflow run to log the size of the data
= datetime.now().strftime("%Y%m%d_%H%M%S")
timestamp with mlflow.start_run(experiment_id=mlflow_experiment_id, run_name=f"{timestamp}_Preprocessing") as run:
print("\n> Loading images from S3...")
# Load in training pictures
= _load_and_convert_images(folder_benign_train)
X_benign = _load_and_convert_images(folder_malignant_train)
X_malignant
# Load in testing pictures
= _load_and_convert_images(folder_benign_test)
X_benign_test = _load_and_convert_images(folder_malignant_test)
X_malignant_test
# Log train-test size in MLflow
print("\n> Log data parameters")
"train_size_benign", X_benign.shape[0])
mlflow.log_param("train_size_malignant", X_malignant.shape[0])
mlflow.log_param("test_size_benign", X_benign_test.shape[0])
mlflow.log_param("test_size_malignant", X_malignant_test.shape[0])
mlflow.log_param(
print("\n> Preprocessing...")
# Create labels
= _create_label(X_benign)
y_benign = _create_label(X_malignant)
y_malignant
= _create_label(X_benign_test)
y_benign_test = _create_label(X_malignant_test)
y_malignant_test
# Merge data
= _merge_data(y_benign, y_malignant)
y_train = _merge_data(y_benign_test, y_malignant_test)
y_test
= _merge_data(X_benign, X_malignant)
X_train = _merge_data(X_benign_test, X_malignant_test)
X_test
# Shuffle data
= shuffle(X_train, y_train)
X_train, y_train = shuffle(X_test, y_test)
X_test, y_test
= to_categorical(y_train, num_classes=2)
y_train = to_categorical(y_test, num_classes=2)
y_test
# With data augmentation to prevent overfitting
= X_train / 255.0
X_train = X_test / 255.0 X_test
Uploading preprocessed data
The four preprocessed numpy arrays (X_train, y_train, X_test, y_test) are uploaded to an S3 bucket. The arrays are stored as pickle files with specific file keys in the bucket. Finally, the paths of the preprocessed data are create and and returned as a tuple of strings.
print("\n> Upload numpy arrays to S3...")
aws_session.upload_npy_to_s3(=X_train,
data=aws_bucket,
s3_bucket=f"{path_preprocessed}/X_train.pkl",
file_key
)
aws_session.upload_npy_to_s3(=y_train,
data=aws_bucket,
s3_bucket=f"{path_preprocessed}/y_train.pkl",
file_key
)
aws_session.upload_npy_to_s3(=X_test,
data=aws_bucket,
s3_bucket=f"{path_preprocessed}/X_test.pkl",
file_key
)
aws_session.upload_npy_to_s3(=y_test,
data=aws_bucket,
s3_bucket=f"{path_preprocessed}/y_test.pkl",
file_key
)
= f"{path_preprocessed}/X_train.pkl"
X_train_data_path = f"{path_preprocessed}/y_train.pkl"
y_train_data_path = f"{path_preprocessed}/X_test.pkl"
X_test_data_path = f"{path_preprocessed}/y_test.pkl"
y_test_data_path
# Return directory paths of the data stored in S3
return X_train_data_path, y_train_data_path, X_test_data_path, y_test_data_path
9.3.2 Model Training
The training step is designed to accommodate different models based on the selected model. The custom model.utils
package, imported at the beginning, enables the selection and retrieval of models. The chosen model can be specified by passing its name to the get_model
function, which then returns the corresponding model. These models are implemented using TensorFlow Keras and their code is stored in the /model
directory. The model is trained using the model_params
parameters provided to the training function, which include all the necessary hyperparameters. The training and evaluation are conducted using the preprocessed data from the previous step, which is downloaded from S3 at the beginning. Depending on the selected model, a KFold cross-validation is performed to improve the model’s fit.
MLflow is utilized to track the model’s progress. By invoking mlflow.start_run()
, a new MLflow run is initiated. The model_params
are logged using mlflow.log_params
, and MLflow autolog is enabled for Keras models through mlflow.keras.autolog()
. After successful training, the models are stored in the model registry. The trained model is logged using mlflow.keras.register_model
, with the specified model_name
as the destination.
The function returns the MLflow run ID and crucial information about the model, such as its name, version, and stage.
Importing Dependencies
This section imports the necessary dependencies for the code, including libraries for machine learning, data manipulation, and utility functions.
# Imports necessary packages
import json
import os
from datetime import datetime
from enum import Enum
from typing import Tuple
import mlflow
import mlflow.keras
import numpy as np
from keras import backend as K
from keras.callbacks import ReduceLROnPlateau
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold
# Import custom modules
from src.model.utils import Model_Class, get_model
from src.utils import AWSSession
Defining the train_model Function
The actual code starts by defining the train_model function, which takes several parameters for training a machine learning model, logging the results to MLflow, and returning relevant information. The MLflow tracking URI is retrieved from the environment variable and sets it as the tracking URI for MLflow.
def train_model(
str,
mlflow_experiment_id:
model_class: Enum,dict,
model_params: str,
aws_bucket: dict = {},
import_dict: -> Tuple[str, str, int, str]:
) """
Trains a machine learning model and logs the results to MLflow.
Args:
mlflow_experiment_id (str): The ID of the MLflow experiment to log the results.
model_class (Enum): The class of the model to train.
model_params (dict): A dictionary containing the parameters for the model.
aws_bucket (str): The AWS S3 bucket name for data storage.
import_dict (dict, optional): A dictionary containing paths for importing data. Defaults to {}.
Returns:
Tuple[str, str, int, str]: A tuple containing the run ID, model name, model version, and current stage.
Raises:
None
"""
= os.getenv("MLFLOW_TRACKING_URI")
mlflow_tracking_uri mlflow.set_tracking_uri(mlflow_tracking_uri)
Loading Data
This section handles the loading of data required for training the model. It retrieves the file paths for the training and testing data from the import_dict
parameter and loads the corresponding NumPy arrays from an AWS S3 bucket using the AWSSession
class.
print("\n> Loading data...")
= import_dict.get("X_train_data_path")
X_train_data_path = import_dict.get("y_train_data_path")
y_train_data_path = import_dict.get("X_test_data_path")
X_test_data_path = import_dict.get("y_test_data_path")
y_test_data_path
# Instantiate aws session based on AWS Access Key
# AWS Access Key is fetched within AWS Session by os.getenv
= AWSSession()
aws_session
aws_session.set_sessions()
# Read NumPy Arrays from S3
= aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=X_train_data_path)
X_train = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=y_train_data_path)
y_train = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=X_test_data_path)
X_test = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=y_test_data_path) y_test
Training the Model
After the data is loaded, the training process for the machine learning model is started. It begins by printing the model class and generating a timestamp for the run name. Then, it starts an MLflow run with the specified experiment ID and run name. The model parameters are logged using MLflow’s log_params function. Additionally, a callback for reducing the learning rate during training is configured using the ReduceLROnPlateau class from Keras.
The model training handles two different scenarios based on the selected model_class
. If it is set to cross-validation (Model_Class.CrossVal
), the model is trained using cross-validation. Otherwise, it is trained using the specified model class.
print("\n> Training model...")
print(model_class)
= datetime.now().strftime("%Y%m%d_%H%M%S")
timestamp with mlflow.start_run(experiment_id=mlflow_experiment_id, run_name=f"{timestamp}-{model_class}") as run:
mlflow.log_params(model_params)= ReduceLROnPlateau(monitor="accuracy", patience=5, verbose=1, factor=0.5, min_lr=1e-7)
learning_rate_reduction
# If CrossVal is selected, train BasicNet as Cross-Validated Model
if model_class == Model_Class.CrossVal.value:
= KFold(n_splits=3, shuffle=True, random_state=11)
kfold = []
cvscores for train, test in kfold.split(X_train, y_train):
= get_model(Model_Class.Basic.value, model_params)
model
# Train Model
model.fit(
X_train[train],
y_train[train],=model_params.get("epochs"),
epochs=model_params.get("batch_size"),
batch_size=model_params.get("verbose"),
verbose
)= model.evaluate(X_train[test], y_train[test], verbose=0)
scores print("%s: %.2f%%" % (model.metrics_names[1], scores[1] * 100))
1] * 100)
cvscores.append(scores[
K.clear_session()else:
= get_model(model_class, model_params)
model
mlflow.keras.autolog()
# Train Model
model.fit(
X_train,
y_train,=model_params.get("validation_split"),
validation_split=model_params.get("epochs"),
epochs=model_params.get("batch_size"),
batch_size=model_params.get("verbose"),
verbose=[learning_rate_reduction],
callbacks
)=True) mlflow.keras.autolog(disable
Testing and Evaluating the Model
After the model training, the trained model is tested on the test data and its prediction accuracy evaluated. The accuracy score is calculated using the accuracy_score
function from scikit-learn and logged as a metric using MLflow. Afterward, the trained and evaluated model is registered with MLflow using the register_model
function. The resulting model name, version, and stage are obtained to finally return them in the functions return
statement.
= run.info.run_id
run_id = f"runs:/{run_id}/{model_class}"
model_uri
# Testing model on test data to evaluate
print("\n> Testing model...")
= model.predict(X_test)
y_pred = accuracy_score(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1))
prediction_accuracy "prediction_accuracy", prediction_accuracy)
mlflow.log_metric(print(f"Prediction Accuracy: {prediction_accuracy}")
print("\n> Register model...")
= mlflow.register_model(model_uri, model_class)
mv
# Return run ID, model name, model version, and current stage of the model
return run_id, mv.name, mv.version, mv.current_stage
9.3.3 Data Preprocessing
The data processing stage involves three primary processes. First, the raw data is loaded from an S3 Bucket. Second, the data is preprocessed and converted into the required format. Finally, the preprocessed data is stored in a way that allows it to be utilized by subsequent models. The data processing functionality is implemented within the given data_preprocessing
function. The utils
module, imported at the beginning, provides the functionality to access, load, and store data from S3. The data is normalized and transformed into a NumPy array to make it compatible with TensorFlow Keras models. The function returns the names and paths of the preprocessed and uploaded data, making it convenient for selecting them for future model training. Moreover, the data preprocessing stage establishes a connection with MLflow to record the sizes of the datasets.
At the beginning of each pipeline step are the import statements for various Python modules and libraries used in the code. They will not be mentioned within the description, but you can look them up in the provided code.
The code starts with the function definition data_preprocessing
. The function takes several input parameters and returns a tuple with paths to the preprocessed data stored as NumPy arrays. The function is equipped with a @timeit
decorator, which serves to measure the execution time of the preprocessing operation.
@timeit
def data_preprocessing(
str,
mlflow_experiment_id: str,
aws_bucket: str = "preprocessed",
path_preprocessed: -> Tuple[str, str, str, str]:
) """Preprocesses data for further use within model training. Raw data is read from given S3 Bucket, normalized, and stored ad a NumPy Array within S3 again. Output directory is on "/preprocessed". The shape of the data set is logged to MLflow.
Args:
mlflow_experiment_id (str): Experiment ID of the MLflow run to log data
aws_bucket (str): S3 Bucket to read raw data from and write preprocessed data
path_preprocessed (str, optional): Subdirectory to store the preprocessed data on the provided S3 Bucket. Defaults to "preprocessed".
Returns:
Tuple[str, str, str, str]: Four strings denoting the path of the preprocessed data stored as NumPy Arrays: X_train_data_path, y_train_data_path, X_test_data_path, y_test_data_path
"""
It continues by retrieving the MLflow tracking URI from an environment variable and setting it as the tracking URI for MLflow.
An AWS session is set up based on AWS Access Key and AWS Secret Access Key, which is similarly provided using environment variables. The AWSSession
Object is a custom class to handle the interaction with S3 buckets.
Afterward. the paths within an S3 bucket for different data folders are defined, including training and testing data for benign and malignant cases.
= os.getenv("MLFLOW_TRACKING_URI")
mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)
# Instantiate aws session based on AWS Access Key
# AWS Access Key is fetched within AWS Session by os.getenv
= AWSSession()
aws_session
aws_session.set_sessions()
# Set paths within s3
= f"s3://{aws_bucket}/data/"
path_raw_data
= f"{path_raw_data}train/benign"
folder_benign_train = f"{path_raw_data}train/malignant"
folder_malignant_train
= f"{path_raw_data}test/benign"
folder_benign_test = f"{path_raw_data}test/malignant" folder_malignant_test
The code proceeds by defining three hidden functions that play a crucial role in the preprocessing of the data:
Image Loading and Conversion: A function named _load_and_convert_images
designed to handle the loading and conversion of images retrieved from an S3 bucket folder into a NumPy array. It utilizes the aws_session
to access and process these images.
Creating Labels: The following function, _create_label
, serves the purpose of generating a label array suitable for a given dataset. This function is responsible for preparing labels essential for classification tasks. The inner function, create_label
, undertakes the task of constructing a label array that corresponds to a specific dataset, thus facilitating classification operations.
Merging Data: Lastly, the function _merge_data
is introduced to combine two datasets into a unified dataset. This inner function, _merge_data_
, plays the role of merging two distinct datasets into a single, consolidated dataset. It effectively combines data originating from diverse sources, allowing for a comprehensive dataset.
@timeit
def _load_and_convert_images(folder_path: str) -> np.array:
"""
Loads and converts images from an S3 bucket folder into a NumPy array.
Args:
folder_path (str): The path to the S3 bucket folder.
Returns:
np.array: The NumPy array containing the converted images.
Raises:
None
"""
= [
ims =aws_bucket, imname=filename)
aws_session.read_image_from_s3(s3_bucketfor filename in tqdm(aws_session.list_files_in_bucket(folder_path))
]return np.array(ims, dtype="uint8")
def _create_label(x_dataset: np.array) -> np.array:
"""
Creates label array for the given dataset.
Args:
x_dataset (np.array): The dataset for which labels are to be created.
Returns:
np.array: The label array.
Raises:
None
"""
return np.zeros(x_dataset.shape[0])
def _merge_data(set_one: np.array, set_two: np.array) -> np.array:
"""
Merges two datasets into a single dataset.
Args:
set_one (np.array): The first dataset.
set_two (np.array): The second dataset.
Returns:
np.array: The merged dataset.
Raises:
None
"""
return np.concatenate((set_one, set_two), axis=0)
The code continues further with the actual data preprocessing steps, including calling the previously stated hidden functions and loading images, creating labels, merging data, and performing data normalization and augmentation. An MLflow run is created for this step to log parameters such as the training and testing size of the data.
# Start a MLflow run to log the size of the data
= datetime.now().strftime("%Y%m%d_%H%M%S")
timestamp with mlflow.start_run(experiment_id=mlflow_experiment_id, run_name=f"{timestamp}_Preprocessing") as run:
print("\n> Loading images from S3...")
# Load in training pictures
= _load_and_convert_images(folder_benign_train)
X_benign = _load_and_convert_images(folder_malignant_train)
X_malignant
# Load in testing pictures
= _load_and_convert_images(folder_benign_test)
X_benign_test = _load_and_convert_images(folder_malignant_test)
X_malignant_test
# Log train-test size in MLflow
print("\n> Log data parameters")
"train_size_benign", X_benign.shape[0])
mlflow.log_param("train_size_malignant", X_malignant.shape[0])
mlflow.log_param("test_size_benign", X_benign_test.shape[0])
mlflow.log_param("test_size_malignant", X_malignant_test.shape[0])
mlflow.log_param(
print("\n> Preprocessing...")
# Create labels
= _create_label(X_benign)
y_benign = _create_label(X_malignant)
y_malignant
= _create_label(X_benign_test)
y_benign_test = _create_label(X_malignant_test)
y_malignant_test
# Merge data
= _merge_data(y_benign, y_malignant)
y_train = _merge_data(y_benign_test, y_malignant_test)
y_test
= _merge_data(X_benign, X_malignant)
X_train = _merge_data(X_benign_test, X_malignant_test)
X_test
# Shuffle data
= shuffle(X_train, y_train)
X_train, y_train = shuffle(X_test, y_test)
X_test, y_test
= to_categorical(y_train, num_classes=2)
y_train = to_categorical(y_test, num_classes=2)
y_test
# With data augmentation to prevent overfitting
= X_train / 255.0
X_train = X_test / 255.0 X_test
Once the data has been normalized and converted into a NumPy array, it is then transferred to an S3 bucket with the assistance of the aws_session
. The function returns the names and paths of the preprocessed data that has been uploaded. This information is made available for use in subsequent functions.
print("\n> Upload numpy arrays to S3...")
aws_session.upload_npy_to_s3(=X_train,
data=aws_bucket,
s3_bucket=f"{path_preprocessed}/X_train.pkl",
file_key
)
aws_session.upload_npy_to_s3(=y_train,
data=aws_bucket,
s3_bucket=f"{path_preprocessed}/y_train.pkl",
file_key
)
aws_session.upload_npy_to_s3(=X_test,
data=aws_bucket,
s3_bucket=f"{path_preprocessed}/X_test.pkl",
file_key
)
aws_session.upload_npy_to_s3(=y_test,
data=aws_bucket,
s3_bucket=f"{path_preprocessed}/y_test.pkl",
file_key
)
= f"{path_preprocessed}/X_train.pkl"
X_train_data_path = f"{path_preprocessed}/y_train.pkl"
y_train_data_path = f"{path_preprocessed}/X_test.pkl"
X_test_data_path = f"{path_preprocessed}/y_test.pkl"
y_test_data_path
return X_train_data_path, y_train_data_path, X_test_data_path, y_test_data_path
9.3.4 Model Training
The training step is designed to accommodate different models based on the selected model. The custom model.utils
package, imported at the beginning, enables the selection and retrieval of models. The chosen model can be specified by passing its name to the get_model
function, which then returns the corresponding model. These models are implemented using TensorFlow Keras and their code is stored in the /model
directory. The model is trained using the model_params
parameters provided to the training function, which include all the necessary hyperparameters. The training and evaluation are conducted using the preprocessed data from the previous step, which is downloaded from S3 at the beginning. Depending on the selected model, a KFold cross-validation is performed to improve the model’s fit.
MLflow is utilized to track the model’s progress. By invoking mlflow.start_run()
, a new MLflow run is initiated. The model_params
are logged using mlflow.log_params
, and MLflow autolog is enabled for Keras models through mlflow.keras.autolog()
. After successful training, the models are stored in the model registry. The trained model is logged using mlflow.keras.register_model
, with the specified model_name
as the destination.
The Function Definition train_model
takes several input parameters, including mlflow_experiment_id
, model_class
, model_params
, aws_bucket
, and an optional import_dict
for importing data. It returns a tuple containing information about the run and model. The code of the functions starts by setting the tracking URI for MLflow.
def train_model(
str,
mlflow_experiment_id:
model_class: Enum,dict,
model_params: str,
aws_bucket: dict = {},
import_dict: -> Tuple[str, str, int, str]:
) """
Trains a machine learning model and logs the results to MLflow.
Args:
mlflow_experiment_id (str): The ID of the MLflow experiment to log the results.
model_class (Enum): The class of the model to train.
model_params (dict): A dictionary containing the parameters for the model.
aws_bucket (str): The AWS S3 bucket name for data storage.
import_dict (dict, optional): A dictionary containing paths for importing data. Defaults to {}.
Returns:
Tuple[str, str, int, str]: A tuple containing the run ID, model name, model version, and current stage.
Raises:
None
"""
= os.getenv("MLFLOW_TRACKING_URI")
mlflow_tracking_uri mlflow.set_tracking_uri(mlflow_tracking_uri)
Afterward, the data required for training and testing the model is loaded from AWS S3 buckets. It fetches file paths from the import_dict
dictionary, and instantiates an AWSSession
. It uses the AWSSession
class to download NumPy arrays from the specified S3 bucket.
print("\n> Loading data...")
= import_dict.get("X_train_data_path")
X_train_data_path = import_dict.get("y_train_data_path")
y_train_data_path = import_dict.get("X_test_data_path")
X_test_data_path = import_dict.get("y_test_data_path")
y_test_data_path
# Instantiate aws session based on AWS Access Key
# AWS Access Key is fetched within AWS Session by os.getenv
= AWSSession()
aws_session
aws_session.set_sessions()
# Read NumPy Arrays from S3
= aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=X_train_data_path)
X_train = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=y_train_data_path)
y_train = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=X_test_data_path)
X_test = aws_session.download_npy_from_s3(s3_bucket=aws_bucket, file_key=y_test_data_path) y_test
The training of the machine learning model is contingent upon the chosen model class. If the model_class
is set to Model_Class.CrossVal
, the training process involves k-fold cross-validation using the BasicNet as the model. Conversely, if the model_class
is anything other than Model_Class.CrossVal
, the model undergoes training without cross-validation, following the specifications associated with the provided class and parameters.
The selection of the model is based on the model_class
Enum, which could be either Model_Class.CrossVal
or Model_Class.ResNet50
. During the training process, the mlflow.autolog
functionality is employed, allowing for the automatic logging of all essential run parameters. This feature can be enabled or disabled as needed to facilitate model training and parameter logging.
print("\n> Training model...")
print(model_class)
= datetime.now().strftime("%Y%m%d_%H%M%S")
timestamp with mlflow.start_run(experiment_id=mlflow_experiment_id, run_name=f"{timestamp}-{model_class}") as run:
mlflow.log_params(model_params)= ReduceLROnPlateau(monitor="accuracy", patience=5, verbose=1, factor=0.5, min_lr=1e-7)
learning_rate_reduction
# If CrossVal is selected, train BasicNet as Cross-Validated Model
if model_class == Model_Class.CrossVal.value:
= KFold(n_splits=3, shuffle=True, random_state=11)
kfold = []
cvscores for train, test in kfold.split(X_train, y_train):
= get_model(Model_Class.Basic.value, model_params)
model # Train Model
model.fit(
X_train[train],
y_train[train],=model_params.get("epochs"),
epochs=model_params.get("batch_size"),
batch_size=model_params.get("verbose"),
verbose
)= model.evaluate(X_train[test], y_train[test], verbose=0)
scores print("%s: %.2f%%" % (model.metrics_names[1], scores[1] * 100))
1] * 100)
cvscores.append(scores[
K.clear_session()# TODO: not very safe, create if-else on other Enums
else:
= get_model(model_class, model_params)
model
mlflow.keras.autolog()# Train Model
model.fit(
X_train,
y_train,=model_params.get("validation_split"),
validation_split=model_params.get("epochs"),
epochs=model_params.get("batch_size"),
batch_size=model_params.get("verbose"),
verbose=[learning_rate_reduction],
callbacks
)=True)
mlflow.keras.autolog(disable
= run.info.run_id
run_id = f"runs:/{run_id}/{model_class}" model_uri
After model training, the trained model is tested on a separate test dataset and log the prediction accuracy as a metric in MLflow. The model is then registered with MLflow and necessary metadata about the registered model are stored in the variable mv
.
# Testing model on test data to evaluate
print("\n> Testing model...")
= model.predict(X_test)
y_pred = accuracy_score(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1))
prediction_accuracy "prediction_accuracy", prediction_accuracy)
mlflow.log_metric(print(f"Prediction Accuracy: {prediction_accuracy}")
print("\n> Register model...")
= mlflow.register_model(model_uri, model_class) mv
Finally, the function returns a tuple containing the run ID and crucial information about the model, such as its name, version, and stage.
return run_id, mv.name, mv.version, mv.current_stage
9.3.5 Model Comparison
The compare_models
function is responsible for model comparison within MLflow. It evaluates MLflow models by considering a specified metric and promotes the best-performing model to the "Staging"
stage within the MLflow Registry. This function accepts two arguments: input_dict
, which is a dictionary containing model names and run IDs, and an optional metric
parameter (defaulting to "prediction_accuracy"
) used for conducting the comparison.
def compare_models(input_dict: dict, metric: str = "prediction_accuracy") -> Tuple[str, str, int]:
"""
Compares a given set of MLflow models based on their logged metric. The model with the best metric will be
transferred to a "Staging" stage within the MLflow Registry.
Args:
input_dict (dict): A dictionary containing the names and run IDs of the MLflow models to compare.
metric (str, optional): The metric to compare the models. Defaults to "prediction_accuracy".
Returns:
Tuple[str, str, int]: A tuple containing the name of the best performing model, its MLflow URI,
and the version of the model.
Raises:
None
"""
= os.getenv("MLFLOW_TRACKING_URI")
mlflow_tracking_uri mlflow.set_tracking_uri(mlflow_tracking_uri)
Once the MLflow tracking URI is configured to align with the “MLFLOW_TRACKING_URI” environment variable, an MLflow client is established. This client is integral to a process known as the Model Comparison Loop. In this loop, models listed in the input_dict
dictionary are individually processed. The loop retrieves their respective metrics and aggregates this data within the all_results
dictionary, facilitating direct model comparisons.
= mlflow.MlflowClient(tracking_uri=mlflow_tracking_uri)
client
= {}
all_results for key, value in input_dict.items():
# extract params/metrics data for run `test_run_id` in a single dict
= client.get_run(value).data.to_dictionary()
model_results_data_dict # get params and metrics for this run (test_run_id)
= model_results_data_dict["metrics"][metric]
model_results_accuracy = model_results_accuracy
all_results[key]
# Get model with maximum accuracy
= max(all_results, key=all_results.get)
serving_model_name = client.get_latest_versions(name=serving_model_name, stages=["None"])[0].version
serving_model_version print(f"acc_dict: {all_results}")
print(f"acc_dict_model: {serving_model_name}")
print(f"latest_model_version: {serving_model_version}")
Subsequently, the model with the highest accuracy, as determined from the gathered metrics, is singled out for special attention. This exceptional model is then moved to the "Staging"
stage within the MLflow Registry, indicating its preparedness for subsequent evaluation and deployment. As a result, a tuple is returned, containing the name of the top-performing model, its corresponding MLflow URI, and the model’s version information.
# Transition model to stage "Staging"
= "Staging"
model_stage =serving_model_name, version=serving_model_version, stage=model_stage)
client.transition_model_version_stage(name= f"models:/{serving_model_name}/{model_stage}"
serving_model_uri
return serving_model_name, serving_model_uri, serving_model_version
9.3.6 Model Deployment & Serving
This code serves a specific purpose: to simplify the deployment of MLflow models to AWS SageMaker. It achieves this by meticulously configuring and overseeing the deployment process through a combination of environment variables and utility functions. By making use of MLflow’s SageMaker integration, this code automates the deployment of a pre-trained TensorFlow model from the MLflow registry to a SageMaker instance.
The primary responsibility for deploying an MLflow model to an AWS SageMaker endpoint lies with the deploy_model_to_sagemaker
function. To kickstart the deployment process, various AWS and MLflow-related environment variables, essential for the deployment, are initially fetched.
def deploy_model_to_sagemaker(
str,
mlflow_model_name: str,
mlflow_model_uri: str,
mlflow_experiment_name: int,
mlflow_model_version: str,
sagemaker_endpoint_name: str,
sagemaker_instance_type: -> bool:
) """
Deploy a machine learning model to AWS SageMaker from MLflow.
This function deploys an MLflow model to an AWS SageMaker endpoint using the specified configuration.
Args:
mlflow_model_name (str): The name of the MLflow model to deploy.
mlflow_model_uri (str): The URI of the MLflow model from the registry.
mlflow_experiment_name (str): The name of the MLflow experiment containing the model.
mlflow_model_version (int): The version of the MLflow model to deploy.
sagemaker_endpoint_name (str): The desired name for the SageMaker endpoint.
sagemaker_instance_type (str): The SageMaker instance type for deployment.
Returns:
bool: True if the task was completed successfully, otherwise False.
"""
# Retrieve AWS and MLflow environment variables
= os.getenv("AWS_ID")
AWS_ID = os.getenv("AWS_REGION")
AWS_REGION = os.getenv("AWS_ROLE_NAME_SAGEMAKER")
AWS_ACCESS_ROLE_NAME_SAGEMAKER = os.getenv("ECR_REPOSITORY_NAME")
ECR_REPOSITORY_NAME = os.getenv("ECR_SAGEMAKER_IMAGE_TAG")
ECR_SAGEMAKER_IMAGE_TAG = os.getenv("MLFLOW_TRACKING_URI")
MLFLOW_TRACKING_URI
# Set the MLflow tracking URI
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
Within this main deployment function, there are three utility functions that have been defined:
_build_image_url
: This function constructs the ECR image URL required for SageMaker._build_execution_role_arn
: It is responsible for generating the SageMaker execution role ARN._get_mlflow_parameters
: This utility function retrieves crucial MLflow model parameters, including the model URI and source code.
def _build_image_url(
str,
aws_id: str,
aws_region: str,
ecr_repository_name: str,
ecr_sagemaker_image_tag: -> str:
) """
Build the ECR image URL for SageMaker.
Args:
aws_id (str): AWS account ID.
aws_region (str): AWS region.
ecr_repository_name (str): Name of the ECR repository.
ecr_sagemaker_image_tag (str): Tag for the ECR image.
Returns:
str: The ECR image URL for SageMaker.
"""
= f"{aws_id}.dkr.ecr.{aws_region}.amazonaws.com/{ecr_repository_name}:{ecr_sagemaker_image_tag}"
image_url return image_url
def _build_execution_role_arn(aws_id: str, access_role_name: str) -> str:
"""
Build the SageMaker execution role ARN.
Args:
aws_id (str): AWS account ID.
access_role_name (str): SageMaker execution role name.
Returns:
str: The SageMaker execution role ARN.
"""
= f"arn:aws:iam::{aws_id}:role/{access_role_name}"
execution_role_arn return execution_role_arn
def _get_mlflow_parameters(experiment_name: str, model_name: str, model_version: int) -> (str, str, str):
"""
Retrieve MLflow model parameters.
Args:
experiment_name (str): Name of the MLflow experiment.
model_name (str): Name of the MLflow model.
model_version (int): Version of the MLflow model.
Returns:
Tuple[str, str]: The model URI and source.
"""
= mlflow.MlflowClient()
client = client.get_model_version(
model_version_details =model_name,
name=model_version,
version
)
# This is for local
# experiment_id = dict(mlflow.get_experiment_by_name(experiment_name))["experiment_id"]
# run_id = model_version_details.run_id
# model_uri = f"mlruns/{experiment_id}/{run_id}/artifacts/{model_name}"
= model_version_details.source
model_source = f"{model_source.removesuffix(model_name)}model"
model_source_adapted
return model_source, model_source_adapted
Following their definition, these utility functions are invoked sequentially. Initially, the ECR image URL for SageMaker and the SageMaker execution role ARN are constructed using _build_image_url
and _build_execution_role_arn
. Subsequently, _get_mlflow_parameters
is called to retrieve pertinent MLflow model parameters, encompassing the model source and any adaptations made to the source code.
= _build_image_url(
image_url =AWS_ID,
aws_id=AWS_REGION,
aws_region=ECR_REPOSITORY_NAME,
ecr_repository_name=ECR_SAGEMAKER_IMAGE_TAG,
ecr_sagemaker_image_tag
)= _build_execution_role_arn(aws_id=AWS_ID, access_role_name=AWS_ACCESS_ROLE_NAME_SAGEMAKER)
execution_role_arn = _get_mlflow_parameters(
model_source, model_source_adapted =mlflow_experiment_name,
experiment_name=mlflow_model_name,
model_name=mlflow_model_version,
model_version
)
print(f"model_source: {model_source}")
print(f"mlflow_model_uri: {mlflow_model_uri}")
print(f"model_source_adapted: {model_source_adapted}")
Each of these preceding steps is considered a prerequisite to the actual deployment of the model to SageMaker.
The MLflow model deployment to an AWS SageMaker endpoint is executed via MLflow’s SageMaker integration. This is accomplished by invoking mlflow.sagemaker._deploy
with all the previously accumulated information. Given that the setup of the SageMaker instance and environment can be time-consuming, a suitable timeout is established. Ultimately, the function returns True
upon successful completion of the deployment task; otherwise, it returns False
.
mlflow.sagemaker._deploy(="create",
mode=sagemaker_endpoint_name,
app_name=model_source_adapted,
model_uri=image_url,
image_url=execution_role_arn,
execution_role_arn=sagemaker_instance_type,
instance_type=1,
instance_count=AWS_REGION,
region_name=2400,
timeout_seconds
)
return True