Great Expectations with Airflow – Integrating Data Quality into Your Pipelines

Last Update: September 24, 2024
Featured image of Great Expectations with Airflow – Integrating Data Quality into Your Pipelines article
Table of Contents
Contributors
Picture of Vivasoft Data Engineering Team
Vivasoft Data Engineering Team
Tech Stack
0 +
Want to accelerate your software development company?

It has become a prerequisite for companies to develop custom software products to stay competitive.

Integrating Great Expectations with Airflow DAG is essential for  modern businesses that rely heavily on data to drive operations, innovation, and decision-making. Poor data quality can have devastating consequences, resulting in misdirected tactics, increased operating expenses, and even regulatory fines.

In an era where data travels via sophisticated pipelines on multiple platforms, it is more crucial than ever to guarantee its quality.

To effectively implement such Data Engineering best practices, consider hiring a skilled Data Engineer from Vivasoft.

With their expertise they can help you automate data quality checks, optimize your workflows, and ensure that your data remains reliable and consistent throughout your processes.

Investing in a Vivasoft Data engineer ensures that your data infrastructure is robust, scalable, and aligned with your business goals.

Project Overview

This is where Great Expectations and Apache Airflow come into play. Great Expectations is an outstanding open-source framework for defining and validating data expectations, ensuring that they align with your requirements before being consumed by downstream processes.

Apache Airflow, on the other hand, is a powerful platform for managing complex processes and data pipelines. In this blog, we’ll walk you through the process of integrating Great Expectations with an Airflow DAG, allowing you to automate and expand your data quality procedures effectively.

Technology Used

  • Python
  • Apache Airflow
  • Great Expectations

Why Integrate Great Expectations with Airflow?

Integrating Great Expectations with Apache Airflow provides an elegant solution to automatingautomate data quality checks throughout your pipelines.

By incorporating validation jobs directly into your Airflow DAGs, you can verify that data quality is consistent as it moves from one task to the next.

This integration improves the dependability of your data pipelines while also allowing you to scale your data quality efforts across several datasets and workflows.

Setting Up Environment

  • Python Virtual environment
				
					Python3 -m venv venv_name
				
			
  • Install Great Expectations
				
					pip3 install great_expectations
				
			
  • Install Apache Airflow
				
					pip3 install apache-airflow
				
			

Top Level Workflow

Toplevel Data Quality validation workflow for great expectations with airflow
Toplevel Data Quality Validation Workflow

Project Directory Structure

Local environment → airflow project dir → *.py, *.csv

Configuring Great Expectations

1. Setting Up Data Sources

Before you can set any expectations, you must first specify the data sources that Great Expectations will validate. Data sources in Great Expectations are connections to your datasets, which can be files, databases, or data lakes.

  • Defining a Data Source: Explain how to create a data source using the CLI or programmatically in Python. Include an example of configuring a conventional data source, such as Pandas DataFrame, SQL database, or Google BigQuery.


2. Creating Expectations

Demonstrate how to configure a data source via the CLI or programmatically in Python. Include an example of how to set up a conventional data source, such as Pandas DataFrame, SQL database, or Google BigQuery.

  • Types of Expectations: Present an overview of common expectations, such as ensuring there aren’t any null values, checking value ranges, and validating column uniqueness.

  • Creating Expectations: Use the CLI and Python programming language to demonstrate how to set expectations.


3. Full code

Now that the data source is set up, you can start setting expectations. Expectations are statements that your data must abide by and specifications that your data must follow.


Here’s an example of creating some basic expectations using Python:

				
					def _validate_gx():
     # create data context
     context = gx.get_context()

     # connect to data
     validator = context.sources.pandas_default.read_csv(csv_file)

     # create expectations
     validator.expect_column_to_exist(column="id")

     validator.expect_table_column_count_to_be_between(min_value=1,
                                           max_value=3)

     validator.expect_table_row_count_to_be_between(min_value=1,
                                               max_value=100)

     validator.expect_column_values_to_not_be_null(column="id",              
     notes="**identification** of each employee")
     validator.expect_column_values_to_be_between(
       "id",
       min_value=1,
       max_value=10
     )
     validator.expect_column_values_to_be_unique(column="id")
    validator.expect_column_unique_value_count_to_be_between(column="id",min_value=1,max_value=10)

     validator.save_expectation_suite(discard_failed_expectations=False)

     # create a checkpoint
     checkpoint = context.add_or_update_checkpoint(
       name="my_quickstart_checkpoint",
       validator=validator,
     )
    
     # validation result
     checkpoint_result = checkpoint.run()
     result = dict(checkpoint_result)["_success"]
     print(f"checkpoint result: {result}")


     if result:
       return 'validation_passed'
     else:
        return 'validation_failed'




				
			

Integrating with Airflow

Data pipelines can incorporate quality checks for data directly when Great Expectations and Apache Airflow are integrated.

This implies that you may make sure the data satisfies your established quality criteria prior to using it for downstream operations like reporting, transformations, or machine learning models.

In addition to improved data reliability, automating these checks within Airflow helps you identify problems promptly and lowers the potential of faulty data spreading across your pipeline.

1. Example code

				
					validate_campaign_data = BranchPythonOperator(task_id='validate_campaign_data',
                          task_display_name='🛢️ validate dataframe data',
                          python_callable=_validate_gx)

				
			

2. Full Code

				
					with DAG(dag_id='DagDataQuality'
        , dag_display_name="📊 Data quality check with great expectations"
        , default_args=default_args
        , description='A simple Test Dag which runs every 2 min inerval'
        , start_date=datetime(2024, 7, 2)
        , schedule_interval=None  # manual trigger
        , catchup=False):
   # Tasks
   # =======
   fetch_data_from_csv = BranchPythonOperator(task_id='fetch_data_from_csv',
                          task_display_name='📄 fetch data from csv',
                          python_callable=_fetch_data_from_csv)
  
   validate_campaign_data = BranchPythonOperator(task_id='validate_campaign_data',
                          task_display_name='🛢️ validate dataframe data',
                          python_callable=_validate_gx)
  
   empty_data = PythonOperator(task_id='empty_data',
                          task_display_name='🗑️ empty data',
                          python_callable=lambda: print("No data is available"))
  
   validation_passed = PythonOperator(task_id='validation_passed',
                                      task_display_name='💯 data validation passed',
                                      python_callable=lambda: print("validatiuon is successfull"))
  
   validation_failed = PythonOperator(task_id='validation_failed',
                                      task_display_name='❌ data validation failed',
                                      python_callable=lambda: print("validation failed"))
   # task flow
   fetch_data_from_csv >> Label("data available") >> validate_campaign_data
   fetch_data_from_csv >> Label("data unavailable") >> empty_data
   validate_campaign_data >> Label("data quality check is successfull") >> validation_passed
   validate_campaign_data >> Label("data quality check is unsuccessfull") >> validation_failed
				
			

Data quality validation pipeline

Data Quality validation pipeline walkthrough for great expectations with airflow
Data Quality Validation Pipeline Walkthrough

Conclusion

Integrating Great Expectations with Apache Airflow offers a powerful approach to automate data quality checks in your pipelines.

You may ensure that only high-quality data flows through your system by specifying expectations and embedding them in Airflow DAGs. The interaction not only boosts the reliability and credibility of your data, but it also streamlines the validation process, transforming it into a crucial aspect of your workflow.

With this configuration, you are well-equipped to detect and tackle data quality issues before they impact downstream processes.

Potential Developer
Tech Stack
0 +
Accelerate Your Software Development Potential with Us
With our innovative solutions and dedicated expertise, success is a guaranteed outcome. Let's accelerate together towards your goals and beyond.
Blogs You May Love

Don’t let understaffing hold you back. Maximize your team’s performance and reach your business goals with the best IT Staff Augmentation