3.2 Exemplary ML workflow
The following shows an exemplary Airflow DAG by building a MNIST Image Classification Workflow with Keras and the Taskflow API. The DAG automates the process of training and testing a convolutional neural network (CNN) model using the popular MNIST dataset and Keras. The DAG demonstrates how to set up a simple task flow for data preprocessing, model training, and model testing. A python file of the DAG can be found here
The Airflow DAG is named "tutorial_mnist_keras_taskflow_api"
and is designed to be informative and easy to understand. It includes three distinct tasks: preprocess_data()
, train_model(data_paths_dict)
, and test_model(model_name)
. These tasks collectively create a streamlined workflow for building, training, and evaluating a deep learning model.
The preprocess_data()
task preprocesses the MNIST dataset, which includes loading and scaling the image data and converting class labels into categorical format. The preprocessed data is then saved as NumPy arrays for subsequent use. This task returns a dictionary containing file paths to these preprocessed data arrays.
The train_model(data_paths_dict)
task is responsible for building and training the Keras CNN model. It starts by loading the preprocessed data using the data paths provided in the input dictionary. The CNN architecture comprises convolutional and pooling layers, followed by flattening and dense layers. After model compilation, training begins with specified hyperparameters such as batch size and epochs. Once training is complete, the model is saved with a name, and the task returns this name as output.
The test_model(model_name)
task loads the trained Keras model using the provided model name and evaluates its performance on the test dataset. It calculates and prints both the test loss and accuracy, providing insights into how well the model generalizes to unseen data.
The DAG execution flow is straightforward. It starts with the preprocess_data()
task, which prepares the data for model training. The output of this task is then passed as an input to the train_model(data_paths_dict)
task, where the CNN model is constructed and trained. Finally, the test_model(model_name)
task tests the trained model and reports its performance metrics.
@dag(
="tutorial_mnist_keras_taskflow_api",
dag_id=None,
schedule=pendulum.datetime(2021, 1, 1, tz="UTC"),
start_date=False,
catchup=["example", "mnist", "keras"],
tags
)def tutorial_mnist_keras_taskflow_api():
"""
This Airflow DAG demonstrates a simple task flow for training and testing a Keras model on the MNIST dataset.
It consists of three tasks: preprocess_data, train_model, and test_model.
"""
@task()
def preprocess_data():
"""
Preprocesses the MNIST dataset, scaling it and saving the preprocessed data as NumPy arrays.
Returns:
dict: A dictionary containing file paths to the preprocessed data arrays.
"""
= f"{os.path.dirname(__file__)}/data/"
dirname print("dirname: ", dirname)
= 10
num_classes = f"{dirname}mnist.npz"
path
= keras.datasets.mnist.load_data(path=path)
(x_train, y_train), (x_test, y_test)
# Scale images to the [0, 1] range
= x_train.astype("float32") / 255
x_train = x_test.astype("float32") / 255
x_test # Make sure images have shape (28, 28, 1)
= np.expand_dims(x_train, -1)
x_train = np.expand_dims(x_test, -1)
x_test print("x_train shape:", x_train.shape)
print(x_train.shape[0], "train samples")
print(x_test.shape[0], "test samples")
# convert class vectors to binary class matrices
= keras.utils.to_categorical(y_train, num_classes)
y_train = keras.utils.to_categorical(y_test, num_classes)
y_test
= f"{dirname}y_train.npy"
y_train_path = f"{dirname}x_train.npy"
x_train_path = f"{dirname}y_test.npy"
y_test_path = f"{dirname}x_test.npy"
x_test_path
np.save(y_train_path, y_train)
np.save(x_train_path, x_train)
np.save(y_test_path, y_test)
np.save(x_test_path, x_test)
= {
data_paths_dict "y_train_path": y_train_path,
"x_train_path": x_train_path,
"y_test_path": y_test_path,
"x_test_path": x_test_path,
}return data_paths_dict
@task(multiple_outputs=True)
def train_model(data_paths_dict: dict):
"""
Trains a Keras model on the preprocessed MNIST dataset.
Args:
data_paths_dict (dict): A dictionary containing file paths to the preprocessed data arrays.
Returns:
model_data_paths_dict: A dictionary containing file paths to the preprocessed data arrays and the model name after training.
"""
= f"{os.path.dirname(__file__)}/data/"
dirname print("dirname: ", dirname)
## Load preprocessed train data
= np.load(data_paths_dict.get("y_train_path"))
y_train = np.load(data_paths_dict.get("x_train_path"))
x_train
## Build the model
= (28, 28, 1)
input_shape = 10
num_classes = keras.Sequential(
model
[=input_shape),
keras.Input(shape32, kernel_size=(3, 3), activation="relu"),
layers.Conv2D(=(2, 2)),
layers.MaxPooling2D(pool_size64, kernel_size=(3, 3), activation="relu"),
layers.Conv2D(=(2, 2)),
layers.MaxPooling2D(pool_size
layers.Flatten(),0.5),
layers.Dropout(="softmax"),
layers.Dense(num_classes, activation
]
)
model.summary()
## Train the model
= 128
batch_size = 15
epochs
compile(
model.="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
loss
)
model.fit(=batch_size, epochs=epochs, validation_split=0.1
x_train, y_train, batch_size
)= f"{dirname}model.keras"
model_path
model.save(model_path)
= deepcopy(data_paths_dict)
model_data_paths_dict "model_name"] = model_path
model_data_paths_dict[return model_data_paths_dict
@task()
def test_model(model_data_paths_dict: str):
"""
Tests a trained Keras model on the test dataset and prints the test loss and accuracy.
Args:
model_data_paths_dict: A dictionary containing file paths to the preprocessed data arrays and the model name after training.
"""
## Load preprocessed test data
= np.load(model_data_paths_dict.get("y_test_path"))
y_test = np.load(model_data_paths_dict.get("x_test_path"))
x_test
= keras.models.load_model(model_data_paths_dict.get("model_name"))
model
= model.evaluate(x_test, y_test, verbose=0)
score print("Test loss:", score[0])
print("Test accuracy:", score[1])
= preprocess_data()
data_paths_dict = train_model(data_paths_dict)
model_data_paths_dict
test_model(model_data_paths_dict)
tutorial_mnist_keras_taskflow_api()