Leverage the Power of Airflow To Build an End-to-End Data Pipeline

Last Update: July 5, 2024
Airflow to build an end-to-end data pipeline.
Table of Contents
Contributors
Picture of Vivasoft Team
Vivasoft Team
Tech Stack
0 +
Want to accelerate your software development your company?

It has become a prerequisite for companies to develop custom software products to stay competitive.

Project Overview

Apache Airflow is a crucial part of contemporary data warehousing since it is an effective tool for coordinating data pipelines. During the implementation stage, we concentrate on using Airflow to manage and automate workflows, guaranteeing smooth data transformation and integration. Below is a summary of the main steps involved in setting up Apache Airflow. We advise you to view the first part of the series before proceeding.

Technology Used

Python, Apache Airflow, BigQuery, GCP Composer, dbt

Objectives

  • Streamline Data Ingestion: Extracting data from social media networks such as Facebook, LinkedIn, Tiktok, Snapchat, and Reddit, as well as legacy databases such as MySQL and MSSQL

  • Parallel Data Processing: To enhance performance, data is processed concurrently. We need to ingest data from the API to BigQuery because each data source is independent. In order to remove leftover csv files from several directories, we must perform several jobs at the same time.

  • Error Handling and Fault Tolerance: Ensuring the pipeline could handle interruptions and failures gracefully. In airflow there is a mechanism to retry if any task is interrupted.

  • Audit log: Debugging and performance tracking are made easier using Airflow log monitoring, which entails viewing and analyzing detailed logs for each task instance using the Airflow web UI.

Technology Stack

1. Apache Airflow

  • DAGs for Social Media Data Sources: Particular Directed Acyclic Graphs (DAGs) were created within Airflow to manage the data extraction, loading, and transformation (ELT) activities for each social media data source (Tiktok, Reddit, Snapchat, LinkedIn, Facebook). Process management became easy with this approach, which also made modularization possible.

  • Parallel Data Processing: Due to the independence of each data source, Airflow’s ELT task parallelism characteristics allow for the simultaneous analysis of data from several sources, maximizing throughput, and reducing processing times.

  • Fault Tolerance with Backup DAGs: Airflow backup DAGs were implemented to guarantee data pipeline continuity and fault tolerance. In the event of disruptions or failures, these backup DAGs might immediately resume operations, reducing downtime and data loss

  • File Cleanup and Maintenance: Regular file cleanup and maintenance were done using Integrated Airflow activities to maximize storage resources and maintain data hygiene. This required making sure that any unnecessary or remaining files (CSV) from data processing tasks performed during ELT were removed as soon as possible.

2. Google BigQuery

  • Columnar Database: LinkedIn has more than 190 data sources, and certain social media data sources, like Facebook, contain more than 500 data fields. BigQuery uses columnar storage structure to maximize query performance. This helps in the adtech sector where instant insights are crucial by enabling the rapid retrieval and analysis of particular columns within enormous datasets.

  • Database partitioning and clustering mechanism: It is used to separate the campaign database by date.

  • Scheduling and storing options for SQL queries.

3. GCP Composer

  • Dynamic Configuration Management: Utilized the adaptability of Cloud Composer to quickly set up and modify Apache Airflow configurations, Python package dependencies (PyPI libraries), and environment variables as needed. This made it possible to adjust to changing project requirements without experiencing any downtime.

  • Auto Scaling: Utilized the Cloud Composer’s adaptability to swiftly set up and adjust Apache Airflow configurations, Python package dependencies (PyPI libraries), and environment variables as needed. This made it possible to adjust to changing project requirements without experiencing any downtime.

  • Worker, Scheduler, and Webserver Configuration: Cloud Composer was used to configure and maintain Airflow components, including workers for task execution, schedulers for job scheduling, and the webserver for user interface interaction. This streamlined management has expedited deployment and maintenance tasks.

  • Trigger Configuration:Created trigger settings in Cloud Composer to automate process execution based on predefined events or circumstances. This eliminated the need for manual intervention while enhancing workflow automation and efficiency.

4. dbt

It will be covered in the series’s final part.

Top Level System Design

Top level data pipeline design

Setting Up Apache Airflow

Prepare the Environment

  • Managing dependencies in a virtual environment is recommended. 
    • Python3 -m venv airflow_venv
    • source airflow_venv/bin/activate

Install Airflow

  • Use pip to install Apache Airflow. It’s critical to specify  the version and include any required additions.
    • pip install apache-airflow==2.9.2

Initial Configuration

Initialize Database

  • Airflow stores metadata in a database. Utilize the following command to initialize the database.:
    • airflow db migrate

Create user

  • Create an Airflow user to access the web UI
    • airflow users create \

–username admin \

–firstname FIRST_NAME \

–lastname LAST_NAME \

–role Admin \

–email  admin@example.com

Spin Up Server

  • Launch the scheduler and web server for Airflow in a different terminal window.
    • airflow webserver –port 8080
    • airflow scheduler

Configure Airflow

Airflow Configuration file

  • The main configuration file is airflow.cfg, in the Airflow home directory (~/airflow by default). Some important settings to configure include:
    • DAGs folder: The directory where your DAG files(pipelines) are stored
      • dags_folder = /path/to/your/dags
    • Executor:  The type of executor (e.g., SequentialExecutor, LocalExecutor, CeleryExecutor)
      • executor = LocalExecutor  # parallel execution
    • DB connection:  The database where airflow saves all it’s metadata
      • sql_alchemy_conn = postgresql+psycopg2://username:password@localhost/database_name

Project Directory Structure

Local environment → dags
  • dags
    • windsor_data_reddit_composer.py
    • windsor_data_tiktok_composer.py
    • windsor_data_snapchat_composer.py
    • windsor_data_linkedin_composer.py
    • windsor_data_facebook_composer.py
    • schema_data/
      • campaign_data/
        • reddit/
          • reddit_data.csv  (temp)
        • tiktok/
          • tiktok_data.csv  (temp)
        • snapchat/
          • snapchat_data.csv  (temp)
        • linkedin/
          • linkedin_data.csv  (temp)
        • fb/
          • facebook_data.csv  (temp)
      • table_schema/
        • tiktok_schema,json
        • snapchat_schema.json
        • reddit_schema.json
        • facebook_schema.json
        • linkedin_schema.json

BigQuery Schema Design

To store data, a schema needs to be developed beforehand. For example, let’s examine the Snapchat schema. It’s available in bucket → dags → schema_data → table_schema → snapchat_schema.json:

  • row_id : field created automatically. That’s the unique key for the row; the other fields are pulled from the API response based on what the company requires.

BigQuery Database Design

BigQuery db design.
  • Raw Data
    • Dataset name: campaign
      • Table name
        • tiktok_campaign
        • snapchat_campaign
        • reddit_campaign
        • linkedin_campaign
        • facebook_campaign
  • Processed Data
    • Dataset name: campaign_analysis
      • Table name
        • src_fb_campaign
        • src_linkedin_campaign
        • src_tiktok_campaign
        • src_reddit_campaign
        • Src_snapchat_campaign

 

  • Dimension Data
    • Dataset name: campaign_analysis_dim
      • Table name
        • dim_all_campaign
        • dim_linkedin_campaign
        • dim_tiktok_campaign
        • dim_reddit_campaign
        • dim_snapchat_campaign
        • dim_fb_campaign
  • Fact Data
    • Dataset name: campaign_analysis_fact
      • Table name
        • fact_fb_campaign
        • fact_linkedin_campaign
        • fact_tiktok_campaign
        • fact_reddit_campaign
        • fact_snapchat_campaign

Create DAG: The first Data pipeline

dags → windsor_data_snapchat_composer.py

Create Connection

  • Create 2 connections: Airflow ui → admin → Connections → click + button to create new connection
    • Gcp connection
    • windsor_api_available

Create Variable

  • Create 4 variables: Airflow ui → admin → Variables → click + button to create new variable
    • Gcp project name
    • Gcp project dataset name
    • Windsor api
    • Windsor api key

Code Implementation (Sequentially)

  • Import libraries
  • Get the dir path
    • data_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), ‘schema_data’)  # schema_data dir –> schema and campaign data
    • snapchat_dir = os.path.join(weselect_data_dir,’campaign_data/snapchat/’)  # snapchat dir
    • yesterday = date.today() – timedelta(days = 1) # for last day data capture from windsor API
 
  • Set the pipeline configuration
    • default_args = {
‘owner’: ‘admin’, ’email’: [‘user_mail@example.com‘], ‘retries’: 5, ‘retry_delay’: timedelta(seconds=20) }
  • Main functionality
    • DAG init
DAG init
  • Create/Skip dataset and table
Create or Skip dataset and table
  • Save data and task flow
Save data and task flow
  • Helper functions
    • Fetch API data
Helper functions
    • Export API data as csv
Export API data as csv
    • Save data to DB
Save data to DB

You may efficiently orchestrate data pipelines both locally and in a cloud environment such as Google Cloud Composer by setting up, configuring, and deploying Apache Airflow by following these comprehensive steps. Learn more about data pipelines with this guide on Apache Airflow.

At VivaSoft Limited, we leverage advanced data engineering technologies like Apache Airflow and Google BigQuery to optimize and streamline your data processes.  Contact us to see how VivaSoft Limited can revolutionize your data management practices.

Tech Stack
0 +
Accelerate Your Software Development Potential with Us
With our innovative solutions and dedicated expertise, success is a guaranteed outcome. Let's accelerate together towards your goals and beyond.
Blogs You May Love

Don’t let understaffing hold you back. Maximize your team’s performance and reach your business goals with the best IT Staff Augmentation