Introduction
If you are a beginner and are just starting to learn MLOps, you might have a question: What are MLOps?
In simple words, MLOps (Machine Learning Operations) is a set of practices for collaboration and communication between data scientists and operations professionals. Applying these practices increases the quality, simplifies the management process, and automates the deployment of Machine Learning and Deep Learning models in large-scale production environments. It’s easier to align models with business needs and regulatory requirements. In this article, we will implement our project using Prefect and CometML.
In this MLOps project, we will build the best possible Machine Learning model using optimal hyperparameters to predict the sales price of a Bulldozer. As you may know, a Bulldozer is a powerful vehicle for shallow digging and ditching.
Learning Objectives
- Learn MLOps concepts and end-to-end ML workflow.
- Implement MLOps pipeline with Prefect and CometML.
- Make reproducible, automated ML workflows.
- Evaluate and monitor ML models.
- End-to-end MLOps experience.
This article was published as a part of the Data Science Blogathon.
What is Prefect and CometML?
Prefect
Prefect is an open-source Python library that helps you define, schedule, and manage data workflows well. It simplifies orchestrating and automating complex data workflows, making tasks easier. Examples are data extraction, transformation, and model training. You can do them in a systematic and repeatable way.
pip install prefect
Another thing I should mention is Prefect Cloud. Prefect Cloud is a cloud-based platform provided by Prefect for managing, orchestrating, and monitoring data workflows in MLOps.
CometML
CometML is a platform for managing and tracking machine learning experiments in MLOps. It provides tools for versioning, collaboration, and visualizing results. It helps streamline the development and monitoring of machine-learning models.
pip install comet_ml
The MLOps project: Let’s get started.
Data Exploration
As we build an end-to-end machine learning model, we will focus more on the ML life cycle than model building.
If you observe the dataset, you will see there are 53 columns. We will use all 52 columns for input features or X, and since our target variable is SalePrice, this will be the y. In the data exploration part, we conducted all kinds of explorations, from df.info() to plotting missing values using a scatter plot. You will find all the steps in my notebook on the GitHub repository. You can also download the dataset from there. Now, let’s start working on the project.
Set Up a Virtual Environment
What is Virtual Environment, and why do we need it?
A virtual environment is a self-contained Python workspace for isolating project dependencies.
You install many libraries on your computer for several projects. You might have installed Python3.11, but sometimes, you need Python3.9 for another project. To avoid conflict, you need to set up a virtual Environment.
Creating a Virtual Environment
python -m venv myenv
#then for activation
myenv\Scripts\activate
python3 -m venv myenv
#then for activation
source myenv/bin/activate
File Structure
Configure CometML and Prefect
To configure CometML, you need to create a file named .comet.config in your project directory and define its configuration parameters. Here is an example of how you can structure a basic .comet.config file:
[comet]
api_key = your_api_key
workspace = your_workspace
project_name = your_project_name
You should sign up for Comet for an api_key, workspace, and project_name. Let’s take a look at how to set up a Comet account.
Set up a Comet account
- Please create a new account. It’s easy and free.
API key
- When your account is created in the top right corner, click your avatar, then select Account Settings.
- To get the API key, click the API Keys tab. Your current API key is displayed there. Click Copy to copy the API key.
- You can see your workspace name and project name in the Workspaces Tab.
So now let’s configure Prefect.
Set Up the Prefect
Prefect provides a cloud platform and API for managing and monitoring workflows. By signing up, we can use Prefect Cloud. It has a dashboard for tracking workflows. It can set notifications, analyze logs, and more. The interesting part is that we can deploy our machine-learning model.
pip install -U prefect
See the install guide for more details.
- Step 2: Connect to Prefect’s API
Prefect’s functionality relies on a backend cloud API. The API manages the execution of workflows and data pipelines. We need to connect Prefect installation to this API. This unlocks useful features. For example, a central dashboard can be used to watch workflow runs. It also lets you set notifications. You can get them when tasks fail, analyze logs, and track task history. Lastly, it lets you scale workloads across a cluster. We can build workflows locally without the API. But we can’t make them operational or ready for production. The Prefect Cloud handles scheduling and retries. It follows limits set through the API. So, using Prefect with its API service offers a serverless platform. It is for managing complex workflows without needing to host your own coordinators.
- Create a new account or sign in at
- Use the prefect cloud login CLI command to
Prefect cloud login
Choose Log in with a web browser and click the Authorize button in the open browser window.
Self-hosted Prefect server instance
You can also run this on your local machine. See the tutorial for help. Note that you must host your own server and run your flows on your own infrastructure.
- Step 3: Turn your function into a Prefect flow
See the flow.py file where I added the @flow decorator. This is the fastest way to get started with Prefect. A “Flow” is a Directed Acyclic Graph (DAG) representing a workflow. In Prefect, a task is a fundamental unit of work in the workflow. We will discuss tasks more later in this tutorial.
5-steps to implement this MLOps project using Prefect and CometML
Here are the 5 steps to implement the MLops project using Prefect and CometML
Step 1 – Ingest data
In this step, we ingest our data from our data folder. Let’s have a look at our ingest_data.py file inside the steps folder
class IngestData:
"""Ingests data from a CSV file."""
def __init__(self, data_path: str):
self.data_path = data_path
def get_data(self):
logging.info(f"Ingest data from {self.data_path}")
return pd.read_csv(self.data_path)
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def ingest_df(data_path: str) -> pd.DataFrame:
"""
Ingest data from the specified path and return a DataFrame.
Args:
data_path (str): The path to the data file.
Returns:
pd.DataFrame: A pandas DataFrame containing the ingested data.
"""
try:
ingest_obj = IngestData(data_path)
df = ingest_obj.get_data()
print(f"Ingesting data from {data_path}")
experiment.log_metric("data_ingestion_status", 1)
return df
except Exception as e:
logging.error(f"Error while ingesting data: {e}")
raise e
finally:
# Ensure that the experiment is ended to log all data
experiment.end()
In Prefect, a task is a fundamental unit of work in a workflow. It represents an individual computation unit or an operation that needs to be performed. So, in this case, our first task is to ingest the data.
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
This Prefect task decorator specifies caching parameters, using task_input_hash as the cache key function and setting a cache expiration of one hour. You can learn more about this in prefect doc.
Step 2 – Clean data
In this step, we will clean our data, and the bellow code will return X_train, X_test, y_train, y_test, for training and testing our ML model. Let’s have a look
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def clean_df(data: pd.DataFrame) -> Tuple[
Annotated[pd.DataFrame, 'X_train'],
Annotated[pd.DataFrame, 'X_test'],
Annotated[pd.Series, 'y_train'],
Annotated[pd.Series, 'y_test'],
]:
"""
Data cleaning class which preprocesses the data and divides it into train and test data.
Args:
data: pd.DataFrame
"""
try:
preprocess_strategy = DataPreprocessStrategy()
data_cleaning = DataCleaning(data, preprocess_strategy)
preprocessed_data = data_cleaning.handle_data()
divide_strategy = DataDivideStrategy()
data_cleaning = DataCleaning(preprocessed_data, divide_strategy)
X_train, X_test, y_train, y_test = data_cleaning.handle_data()
logging.info(f"Data Cleaning Complete")
experiment.log_metric("data_cleaning_status", 1)
return X_train, X_test, y_train, y_test
except Exception as e:
logging.error(e)
raise e
finally:
# Ensure that the experiment is ended to log all data
experiment.end()
Till this point, if you observe the above code carefully, you might be thinking, where are the DataPreprocessStrategy(), and DataDivideStrategy() defined inside the model folder, we define these methods; let’s have a look
class DataPreprocessStrategy(DataStrategy):
"""
Data preprocessing strategy which preprocesses the data.
"""
def handle_data(self, data: pd.DataFrame) -> pd.DataFrame:
try:
"""
Performs transformations on df and returns transformaed df.
"""
# Convert 'saledate' column to datetime
data['saledate'] = pd.to_datetime(data['saledate'])
data["saleYear"] = data.saledate.dt.year
data["saleMonth"] = data.saledate.dt.month
data["saleDay"] =data.saledate.dt.day
data["saleDayOfWeek"] = data.saledate.dt.dayofweek
data["saleDayOfYear"] = data.saledate.dt.dayofyear
data.drop("saledate", axis=1, inplace=True)
# Fill the numeric row with median
for label, content in data.items():
if pd.api.types.is_numeric_dtype(content):
if pd.isnull(content).sum():
# Add a binary column which tells us if the data was missing
# or not
data[label+"is_missing"] = pd.isnull(content)
# Fill missing numeric values with median
data[label] = content.fillna(content.median())
# Filled categorical missing data and turn categories into numbers
if not pd.api.types.is_numeric_dtype(content):
data[label+"is_missing"] = pd.isnull(content)
# We add +1 to the category code because pandas encodes
# missing categories as -1
data[label] = pd.Categorical(content).codes+1
return data
except Exception as e:
logging.error("Error in Data handling: {}".format(e))
raise e
In my GitHub repository, you can find all methods.
Step 3 – Train model
We will train a simple linear regression model using the Scikit learn library.
# Create a CometML experiment
experiment = Experiment()
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def train_model(
X_train: pd.DataFrame,
X_test: pd.DataFrame,
y_train: pd.Series,
y_test: pd.Series,
config: ModelNameConfig = ModelNameConfig(),
) -> RegressorMixin:
"""
Train a regression model based on the specified configuration.
Args:
X_train (pd.DataFrame): Training data features.
X_test (pd.DataFrame): Testing data features.
y_train (pd.Series): Training data target.
y_test (pd.Series): Testing data target.
config (ModelNameConfig): Model configuration.
Returns:
RegressorMixin: Trained regression model.
"""
try:
model = None
if config.model_name == "random_forest_regressor":
model = RandomForestRegressor(n_estimators=40,
min_samples_leaf=1,
min_samples_split=14,
max_features=0.5,
n_jobs=-1,
max_samples=None,
random_state=42)
trained_model = model.fit(X_train, y_train)
# Save the trained model to a file
model_filename = "trained_model.pkl"
with open(model_filename, 'wb') as model_file:
pickle.dump(trained_model, model_file)
print("train model finished")
experiment.log_metric("model_training_status", 1)
return trained_model
else:
raise ValueError("Model name not supported")
except Exception as e:
logging.error(f"Error in train model: {e}")
raise e
finally:
# Ensure that the experiment is ended to log all data
experiment.end()
Step 4 – Evaluate model
# Create a CometML experiment
experiment = Experiment()
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def evaluate_model(
model: RegressorMixin, X_test: pd.DataFrame, y_test: pd.Series
) -> Tuple[Annotated[float, "r2"],
Annotated[float, "rmse"],
]:
"""
Args:
model: RegressorMixin
x_test: pd.DataFrame
y_test: pd.Series
Returns:
r2_score: float
rmse: float
"""
try:
prediction = model.predict(X_test)
# Using the MSE class for mean squared error calculation
mse_class = MSE()
mse = mse_class.calculate_score(y_test, prediction)
experiment.log_metric("MSE", mse)
# Using the R2Score class for R2 score calculation
r2_class = R2Score()
r2 = r2_class.calculate_score(y_test, prediction)
experiment.log_metric("R2Score", r2)
# Using the RMSE class for root mean squared error calculation
rmse_class = RMSE()
rmse = rmse_class.calculate_score(y_test, prediction)
experiment.log_metric("RMSE", rmse)
# Log metrics to CometML
experiment.log_metric("model_evaluation_status", 1)
print("Evaluate model finished")
return r2, rmse
except Exception as e:
logging.error(f"Error in evaluation: {e}")
raise e
finally:
# Ensure that the experiment is ended to log all data
experiment.end()
We have logged all those metrics, like r2 score, mse, and rmse. You can see the above code. We can visualize those matrices on the CometML dashboard. However, when you run the flow, you can see the dashboard. In the next step, we discuss that.
Step 5 – Run the flow (The final step)
We have to run the flow.
We import all the tasks and flows into the flow.py file and run our flow from there.
python3 flow.py
from prefect import flow
from steps. ingest_data import ingest_df
from steps.clean_data import clean_df
from steps.train_model import train_model
from steps.evaluation import evaluate_model
## import comet_ml at the top of your file
from comet_ml import Experiment
## Create an experiment with your api key
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def my_flow():
data_path="/home/dhrubaubuntu/gigs_projects/Bulldozer-price-prediction/data/TrainAndValid.csv"
df = ingest_df(data_path)
X_train, X_test, y_train, y_test = clean_df(df)
model = train_model(X_train, X_test, y_train, y_test)
r2_score, rmse = evaluate_model(model, X_test, y_test)
# Run the Prefect Flow
if __name__ == "__main__":
my_flow()
Here, you can see all the run-in flow dashboards in Prefect
Conclusion
Implementing end-to-end MLOps enables organizations to reliably scale-out machine learning solutions in production. This tutorial demonstrated an automated workflow for predicting electric vehicle ranges using open-source libraries like Prefect and CometML.
Key highlights from the project include:
- Orchestrating an ML pipeline with Prefect involves handling steps ranging from data ingestion, preprocessing, model development, evaluation, and monitoring.
- Tracking experiments in CometML to visualize model metrics like RMSE and R2 scores over time for comparison.
- Monitoring workflow executions in Prefect Cloud showing task durations.
Overall, this showcase implements data science best practices of automation, reproducibility, and monitoring in a structured workflow critical for real-world ML systems. Extending and operationalizing to production can further leverage Prefect’s scalability in managing large-scale flows across distributed infrastructure.
Key Takeaways
Some key takeaways from this end-to-end MLOps tutorial include:
- Implementing MLOps improves data scientists and IT collaboration with automation and DevOps practices.
- Prefect enables the creation of robust data pipelines and workflows to ingest, process, train, and evaluate models.
- CometML provides an easy way to track ML experiments with logging and visualization.
- Orchestrating the ML lifecycle end-to-end ensures models remain relevant as new data comes in.
- Monitoring workflow executions helps identify and troubleshoot failures quickly.
- MLOps unlocks faster experimentation by simplifying retraining and deployment of updated models.
Frequently Asked Questions
Ans. MLOps for machine learning is a set of practices that aims to streamline and automate the end-to-end machine learning lifecycle, including model development, deployment, and maintenance, to enhance collaboration and efficiency in data science and operations teams.
Ans. Prefect is an open-source Python library for workflow management. It enables the creation, scheduling, and orchestration of data workflows and tasks commonly used in data science and automation pipelines. It simplifies complex workflows, focusing on flexibility, reliability, and monitoring.
Ans. CometML is a platform for machine learning experimentation and collaboration. It provides tools for tracking, comparing, and optimizing machine learning experiments, enabling teams to log and share experiment details, metrics, and visualizations to improve model development and collaboration.
Ans. Prefect is used for workflow management in data science and automation. It helps streamline and orchestrate complex data workflows, making designing, scheduling, and cohesively monitoring tasks easier. Prefect is commonly employed for data processing, machine learning model training, and other data-centric operations, providing a framework for building, running, and managing workflows efficiently.
Ans. MLflow is an open-source platform for managing the end-to-end machine learning lifecycle, including experiment tracking, packaging code into reproducible runs, and sharing and deploying models. Comet is a platform for machine learning experimentation and collaboration, focusing on experiment tracking, visualizations, and collaboration features. It provides a centralized hub for teams to analyze and share results. While both support experiment tracking, MLflow offers additional model packaging and deployment features, while Comet emphasizes collaboration and visualization capabilities.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.