Integrating Great Expectations with Airflow DAG is essential for modern businesses that rely heavily on data to drive operations, innovation, and decision-making. Poor data quality can have devastating consequences, resulting in misdirected tactics, increased operating expenses, and even regulatory fines.
In an era where data travels via sophisticated pipelines on multiple platforms, it is more crucial than ever to guarantee its quality.
To effectively implement such Data Engineering best practices, consider hiring a skilled Data Engineer from Vivasoft.
With their expertise they can help you automate data quality checks, optimize your workflows, and ensure that your data remains reliable and consistent throughout your processes.
Investing in a Vivasoft Data engineer ensures that your data infrastructure is robust, scalable, and aligned with your business goals.
Project Overview
This is where Great Expectations and Apache Airflow come into play. Great Expectations is an outstanding open-source framework for defining and validating data expectations, ensuring that they align with your requirements before being consumed by downstream processes.
Apache Airflow, on the other hand, is a powerful platform for managing complex processes and data pipelines. In this blog, we’ll walk you through the process of integrating Great Expectations with an Airflow DAG, allowing you to automate and expand your data quality procedures effectively.
Technology Used
- Python
- Apache Airflow
- Great Expectations
Why Integrate Great Expectations with Airflow?
Integrating Great Expectations with Apache Airflow provides an elegant solution to automatingautomate data quality checks throughout your pipelines.
By incorporating validation jobs directly into your Airflow DAGs, you can verify that data quality is consistent as it moves from one task to the next.
This integration improves the dependability of your data pipelines while also allowing you to scale your data quality efforts across several datasets and workflows.
Setting Up Environment
- Python Virtual environment
Python3 -m venv venv_name
- Install Great Expectations
pip3 install great_expectations
- Install Apache Airflow
pip3 install apache-airflow
Top Level Workflow
Project Directory Structure
Local environment → airflow project dir → *.py, *.csv
Configuring Great Expectations
1. Setting Up Data Sources
Before you can set any expectations, you must first specify the data sources that Great Expectations will validate. Data sources in Great Expectations are connections to your datasets, which can be files, databases, or data lakes.
- Defining a Data Source: Explain how to create a data source using the CLI or programmatically in Python. Include an example of configuring a conventional data source, such as Pandas DataFrame, SQL database, or Google BigQuery.
2. Creating Expectations
Demonstrate how to configure a data source via the CLI or programmatically in Python. Include an example of how to set up a conventional data source, such as Pandas DataFrame, SQL database, or Google BigQuery.
- Types of Expectations: Present an overview of common expectations, such as ensuring there aren’t any null values, checking value ranges, and validating column uniqueness.
- Creating Expectations: Use the CLI and Python programming language to demonstrate how to set expectations.
3. Full code
Now that the data source is set up, you can start setting expectations. Expectations are statements that your data must abide by and specifications that your data must follow.
Here’s an example of creating some basic expectations using Python:
def _validate_gx():
# create data context
context = gx.get_context()
# connect to data
validator = context.sources.pandas_default.read_csv(csv_file)
# create expectations
validator.expect_column_to_exist(column="id")
validator.expect_table_column_count_to_be_between(min_value=1,
max_value=3)
validator.expect_table_row_count_to_be_between(min_value=1,
max_value=100)
validator.expect_column_values_to_not_be_null(column="id",
notes="**identification** of each employee")
validator.expect_column_values_to_be_between(
"id",
min_value=1,
max_value=10
)
validator.expect_column_values_to_be_unique(column="id")
validator.expect_column_unique_value_count_to_be_between(column="id",min_value=1,max_value=10)
validator.save_expectation_suite(discard_failed_expectations=False)
# create a checkpoint
checkpoint = context.add_or_update_checkpoint(
name="my_quickstart_checkpoint",
validator=validator,
)
# validation result
checkpoint_result = checkpoint.run()
result = dict(checkpoint_result)["_success"]
print(f"checkpoint result: {result}")
if result:
return 'validation_passed'
else:
return 'validation_failed'
Integrating with Airflow
Data pipelines can incorporate quality checks for data directly when Great Expectations and Apache Airflow are integrated.
This implies that you may make sure the data satisfies your established quality criteria prior to using it for downstream operations like reporting, transformations, or machine learning models.
In addition to improved data reliability, automating these checks within Airflow helps you identify problems promptly and lowers the potential of faulty data spreading across your pipeline.
1. Example code
validate_campaign_data = BranchPythonOperator(task_id='validate_campaign_data',
task_display_name='🛢️ validate dataframe data',
python_callable=_validate_gx)
2. Full Code
with DAG(dag_id='DagDataQuality'
, dag_display_name="📊 Data quality check with great expectations"
, default_args=default_args
, description='A simple Test Dag which runs every 2 min inerval'
, start_date=datetime(2024, 7, 2)
, schedule_interval=None # manual trigger
, catchup=False):
# Tasks
# =======
fetch_data_from_csv = BranchPythonOperator(task_id='fetch_data_from_csv',
task_display_name='📄 fetch data from csv',
python_callable=_fetch_data_from_csv)
validate_campaign_data = BranchPythonOperator(task_id='validate_campaign_data',
task_display_name='🛢️ validate dataframe data',
python_callable=_validate_gx)
empty_data = PythonOperator(task_id='empty_data',
task_display_name='🗑️ empty data',
python_callable=lambda: print("No data is available"))
validation_passed = PythonOperator(task_id='validation_passed',
task_display_name='💯 data validation passed',
python_callable=lambda: print("validatiuon is successfull"))
validation_failed = PythonOperator(task_id='validation_failed',
task_display_name='❌ data validation failed',
python_callable=lambda: print("validation failed"))
# task flow
fetch_data_from_csv >> Label("data available") >> validate_campaign_data
fetch_data_from_csv >> Label("data unavailable") >> empty_data
validate_campaign_data >> Label("data quality check is successfull") >> validation_passed
validate_campaign_data >> Label("data quality check is unsuccessfull") >> validation_failed
Data quality validation pipeline
Conclusion
Integrating Great Expectations with Apache Airflow offers a powerful approach to automate data quality checks in your pipelines.
You may ensure that only high-quality data flows through your system by specifying expectations and embedding them in Airflow DAGs. The interaction not only boosts the reliability and credibility of your data, but it also streamlines the validation process, transforming it into a crucial aspect of your workflow.
With this configuration, you are well-equipped to detect and tackle data quality issues before they impact downstream processes.