Airflow + MLflow stack
This tutorial combines several of the most popular MLOps tools to showcase what your workflow would look like using these tools, from experimentation to production. The experimentation loop uses Jupyter, MLflow, and Git with DVC. The production loop consists of Git with DVC, Airflow, MLflow, BentoML, Prometheus, and Grafana.
🧐 Spotting haters on the Web
To see this stack in action, we use Kaggle's Toxic Comment Classifier dataset as an example. This is a Natural Language Processing (NLP) problem where we analyze comments published on Wikipedia to spot hateful messages.
Tutorial with codeView on GitHub
See below a step-by-step guide on how to implement this stack.
Launching a pre-configured stack is the easiest way to follow along:
Pre-configured cloud machine with databases and repositories
All tools installed and configured
Get free credits when you sign up
Alternatively, you can install and configure the tools on your computer.
We start by exploring and understanding the data. Next, we test three different embeddings: bag-of-words, TF-IDF, and Word2vec, using the K-nearest neighbors (KNN) classifier to train the models. Our goal is to select the best model to take to production.
Edit the notebook
The experimentation flow starts from the notebook and your data. Version the dataset with DVC and the code with Git before running the notebook to create reproducible experiments. Doing so allows MLflow to track the data and code used during training and store this information together with your experiment results.
Version the data
To start versioning the data, you must tell DVC which files to version. Do this by using the command below, specifying the data files, and pushing the changes to the DVC repository.
dvc add data/train.csv dvc push
DVC will generate a human-readable
.dvc file. This file is a data placeholder and must be committed with your code using Git. Your actual data files will be included in
.gitignore and will not be pushed to your Git repo, keeping the data and code layers separate.
Version the code
Use Git to stage the
.dvc file and any modifications made to the notebook or any other relevant code in the project.
git add data/train.csv.dvc notebooks/ToxicCommentClassifier.ipynb
Finally, commit the changes.
git commit -m 'Comment about the code and data'
After completing these steps, you will effectively create a snapshot of your data and code. You can use the Git commit sha to retrieve that snapshot anytime. Optionally, you can mark specific points in your repo's history using Git tags to retrieve them more easily.
Track metrics and artifacts
Use MLflow to track metrics from your experiments. We use the
autolog function to track most metrics and artifacts automatically.
You can also manually log additional parameters. For example, we manually associate the Git commit sha with the current MLflow run.
This means you can go back in time and see the exact data and code that produced those results.
Run the notebook and iterate
Run your notebook and check your results in MLflow. Rinse and repeat. Make a change to the code or data, then use DVC and Git to version the changes. When you rerun your experiment, MLflow will track and associate your results with the data and code versions you used. Over time, you will have a list of experiments in MLflow. You can reproduce any of them and pick the best one.
In our tutorial, the best results were produced by the Word2vec embedding. We select this model to take to production in the following steps.
In the production workflow, we create a pipeline to automate data and code versioning, data preprocessing, and finally, model retraining and serving.
Create a pipeline
The pipeline is created using Airflow and defined in a
.py file. A pipeline is also known as a Direct Acyclic Graph (DAG). It automates all necessary steps to go from data to a deployed model. Each pipeline run will retrain and redeploy the model based on the latest state of your dataset. Airflow triggers the pipeline at a fixed schedule, assuming your application regularly collects new data between the triggers.
Add data versioning step
Similarly to the experimentation flow, you want to automatically version your dataset with DVC, so you know which data produced that model. In Airflow, we use the
BashOperator to execute the same DVC versioning commands we used during experimentation.
Add code versioning step
The next pipeline step uses the
BashOperator to version the
.dvc file and other code changes using Git commands. This creates a snapshot of your data and code's current state, enabling the reproduction of the results.
Add training step
We'll manually copy and refactor the preprocessing and training code from our notebook and add it as a step in the pipeline. Once again, we use MLflow to track training metrics and artifacts. We also use MLflow's model registry to track which model versions are in staging and in production. Promoting a model to production in MLflow does not mean the model is actually deployed. The information in MLflow's registry is just for logging purposes, and the deployment is independent of MLflow.
Add model deployment steps
We use BentoML to build and serve the models. BentoML packages the model by importing the relevant files from MLflow and creates a Docker container to run it in production. You must create a
.py file defining the service endpoints and a
bentofile.yaml configuration file. We use Airflow to bring the container online with Docker Compose, configured through a
Export the model serving metrics
BentoML exports serving metrics out-of-the-box. We configure Prometheus to regularly collect these metrics. You can also define your own custom metrics if needed.
Monitor the model
The metrics collected by Prometheus are used to create monitoring dashboards with Grafana. An example dashboard is provided as a
.json file. This can be imported and customized. You can also use Grafana to create alerts based on the metrics to warn you when something is not working correctly.
Finally, we test our setup using a notebook to make prediction requests to the deployed model. After sending some requests, you can visualize the metrics in Grafana. The prediction requests in a typical production workflow would come from your own application.
Toxic Comment Classifier
1 2 3 4
import numpy as np import pandas as pd import seaborn as sns from matplotlib import pyplot as plt
First, we load the dataset of Wikipedia comments from the
data folder. This dataset was obtained from Toxic Comment Classification Challenge Kaggle competition. Each row contains the comment text and labels for toxic behavior:
1 2 3 4 5 6 7 8 9
import os # We limit the number of rows loaded from the dataset to speed up training time NROWS = 5000 tutorial_dir_path = os.path.dirname(os.path.abspath(os.getcwd())) train_data_path = os.path.join(tutorial_dir_path, "data/train.csv") train = pd.read_csv(train_data_path, nrows=NROWS)
Let's explore the data to visualize what it looks like and understand the distribution across the toxicity behaviors.
1 2 3
print("Train data shape: ", train.shape) # sample rows to visualize train.head()
1 2 3 4
sentence_lengths = [len(sentence) for sentence in train['comment_text']] plt.hist(sentence_lengths,500) plt.xlabel('Length of comments') plt.show()
1 2 3 4 5 6 7 8 9 10 11
feature = train.drop(['id', 'comment_text'], axis=1) ### Removed unnecessary columns - id and comment_text counts =  ### A list that contains tuple which consists of class label and number of comments for that particular class categories = list(feature.columns.values) for i in categories: counts.append((i, feature[i].sum())) df_1 = pd.DataFrame(counts, columns=['Feature Labels', 'Total Comments']) ### Dataframe made up of category and total number of comments df_1.plot(x='Feature Labels', y='Total Comments', kind='bar', figsize=(8,8)) plt.title("Comments per category") plt.ylabel('Total comments', fontsize=12) plt.xlabel('Feature Labels', fontsize=12)
1 2 3 4 5 6
from sklearn.model_selection import train_test_split X = train['comment_text'] y = train.iloc[:,2:] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
We preprocess the text using these rules:
- Remove special chars
- Remove punctuation
- Convert to lowercase
- Replace numbers
- Tokenize text
- Remove stopwords
- Lemmatize words
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
from sklearn.base import BaseEstimator class TextPreprocessor(BaseEstimator): """TextPreprocessor preprocesses text by applying these rules: - Remove special chars - Remove punctuation - Convert to lowercase - Replace numbers - Tokenize text - Remove stopwords - Lemmatize words It implements the BaseEstimator interface and can be used in sklearn pipelines. """ def remove_special_chars(self, text): import re import html re1 = re.compile(r' +') x1 = text.lower().replace('#39;', "'").replace('amp;', '&').replace('#146;', "'").replace( 'nbsp;', ' ').replace('#36;', '$').replace('\\n', "\n").replace('quot;', "'").replace( '<br />', "\n").replace('\\"', '"').replace('<unk>', 'u_n').replace(' @.@ ', '.').replace( ' @-@ ', '-').replace('\\', ' \\ ') return re1.sub(' ', html.unescape(x1)) def remove_punctuation(self, text): """Remove punctuation from list of tokenized words""" import string translator = str.maketrans('', '', string.punctuation) return text.translate(translator) def to_lowercase(self, text): return text.lower() def replace_numbers(self, text): """Replace all interger occurrences in list of tokenized words with textual representation""" import re return re.sub(r'\d+', '', text) def text2words(self, text): from nltk.tokenize import word_tokenize return word_tokenize(text) def remove_stopwords(self, words): """ :param words: :type words: :param stop_words: from sklearn.feature_extraction.stop_words import ENGLISH_STOP_WORDS or from spacy.lang.en.stop_words import STOP_WORDS :type stop_words: :return: :rtype: """ from nltk.corpus import stopwords stop_words = stopwords.words('english') return [word for word in words if word not in stop_words] def lemmatize_words(self, words): """Lemmatize words in text""" from nltk.stem import WordNetLemmatizer lemmatizer = WordNetLemmatizer() return [lemmatizer.lemmatize(word) for word in words] def lemmatize_verbs(self, words): """Lemmatize verbs in text""" from nltk.stem import WordNetLemmatizer lemmatizer = WordNetLemmatizer() return ' '.join([lemmatizer.lemmatize(word, pos='v') for word in words]) def clean_text(self, text): text = self.remove_special_chars(text) text = self.remove_punctuation(text) text = self.to_lowercase(text) text = self.replace_numbers(text) words = self.text2words(text) words = self.remove_stopwords(words) words = self.lemmatize_words(words) words = self.lemmatize_verbs(words) return ''.join(words) def fit(self, X, y=None): return self def transform(self, X): return map(lambda x: self.clean_text(x), X)
Tracking experiment with MLflow
We use a mix of manual and automatic logging to record training runs with MLflow.
mlflow.sklearn.autolog does the heavy lifting of tracking most metrics and parameters. We complement these metrics by logging some results manually.
Important: Because the git commit sha is not automatically tracked by MLflow when logging data on Jupyter notebooks, we must log it manually as a workaround. This allows us to connect experiment results to the version of the code and data that generated it.
MLflow run ->
Git commit ->
DVC data version
1 2 3 4 5 6 7 8 9
import mlflow mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("Toxic Comment Classifier") mlflow.sklearn.autolog(silent=True) import git repo = git.Repo(search_parent_directories=True) sha_commit = repo.head.object.hexsha
Training the models
We try three different approaches where we play with different vectorization methods: bag of words, TF-IDF and word2vec. The K-nearest-neighbors classifier remains the same for all three experiments. In summary:
- Bag of words + K-nearest-neighbors
- TF-IDF + K-nearest-neighbors
- Word2vec + K-nearest-neighbors
1 2 3 4 5 6 7 8 9 10
from sklearn.feature_extraction.text import CountVectorizer from sklearn.neighbors import KNeighborsClassifier from sklearn.pipeline import Pipeline from sklearn.metrics import f1_score p = Pipeline([ ('preprocessing', TextPreprocessor()), ('bow', CountVectorizer(min_df=2, max_features=1000)), ('knn', KNeighborsClassifier(n_neighbors=6)) ])
1 2 3 4 5 6 7 8
mlflow.start_run() mlflow.set_tag('mlflow.source.git.commit', sha_commit) mlflow.set_tag('method', 'bag of words') p.fit(X_train, y_train) p.score(X_test, y_test) mlflow.end_run()
1 2 3 4 5 6 7 8 9
from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.neighbors import KNeighborsClassifier from sklearn.pipeline import Pipeline p = Pipeline([ ('preprocessing', TextPreprocessor()), ('tfidf', TfidfVectorizer(ngram_range=(1, 2), min_df=2, max_features=1000)), ('knn', KNeighborsClassifier(n_neighbors=6)) ])
1 2 3 4 5 6 7 8
mlflow.start_run() mlflow.set_tag('mlflow.source.git.commit', sha_commit) mlflow.set_tag('method', 'tfidf') p.fit(X_train, y_train) p.score(X_test, y_test) mlflow.end_run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
from sklearn.base import BaseEstimator from gensim.models import Word2Vec class Word2vecTransformer(BaseEstimator): """ Wor2vecTransformer provides a wrapper around gensim's Word2Vec model to be used in sklearn's pipeline. """ def __init__(self, min_count=5, vector_size=100, window=5): self.min_count = min_count self.vector_size = vector_size self.window = window self.sentences =  def fit(self, x, y=None): self.sentences = list(map(lambda k: k.split(), x)) self.model = Word2Vec(min_count=self.min_count, vector_size=self.vector_size, window=self.window, sg=1) self.model.build_vocab(self.sentences, progress_per=10000) self.model.train(self.sentences, total_examples=self.model.corpus_count, epochs=20) return self def transform(self, x): sentences = list(map(lambda k: k.split(), x)) if len(sentences) != 0: self.sentences = sentences w2v_words = list(self.model.wv.index_to_key) # We calculate the sentence embedding as the average of the embedding of the words in the sentence vector =  for sentence in self.sentences: sentence_vec = np.zeros(self.vector_size) count = 0 for word in sentence: if word in w2v_words: vec = self.model.wv[word] sentence_vec += vec count += 1 if count != 0: sentence_vec /= count # averaging vector.append(sentence_vec) return vector
1 2 3 4 5 6 7 8
from sklearn.neighbors import KNeighborsClassifier from sklearn.pipeline import Pipeline p = Pipeline([ ('preprocessing', TextPreprocessor()), ('word2vec', Word2vecTransformer()), ('knn', KNeighborsClassifier(n_neighbors=6)) ])
1 2 3 4 5 6 7 8
mlflow.start_run() mlflow.set_tag('mlflow.source.git.commit', sha_commit) mlflow.set_tag('method', 'word2vec') p.fit(X_train, y_train) p.score(X_test, y_test) mlflow.end_run()