Project Overview
Apache Airflow is a crucial part of contemporary data warehousing since it is an effective tool for coordinating data pipelines. During the implementation stage, we concentrate on using Airflow to manage and automate workflows, guaranteeing smooth data transformation and integration. Below is a summary of the main steps involved in setting up Apache Airflow. We advise you to view the first part of the series before proceeding.
Technology Used
Python, Apache Airflow, BigQuery, GCP Composer, dbt
Objectives
- Streamline Data Ingestion: Extracting data from social media networks such as Facebook, LinkedIn, Tiktok, Snapchat, and Reddit, as well as legacy databases such as MySQL and MSSQL.
- Parallel Data Processing: To enhance performance, data is processed concurrently. We need to ingest data from the API to BigQuery because each data source is independent. In order to remove leftover csv files from several directories, we must perform several jobs at the same time.
- Error Handling and Fault Tolerance: Ensuring the pipeline could handle interruptions and failures gracefully. In airflow there is a mechanism to retry if any task is interrupted.
- Audit log: Debugging and performance tracking are made easier using Airflow log monitoring, which entails viewing and analyzing detailed logs for each task instance using the Airflow web UI.
Technology Stack
1. Apache Airflow
- DAGs for Social Media Data Sources: Particular Directed Acyclic Graphs (DAGs) were created within Airflow to manage the data extraction, loading, and transformation (ELT) activities for each social media data source (Tiktok, Reddit, Snapchat, LinkedIn, Facebook). Process management became easy with this approach, which also made modularization possible.
- Parallel Data Processing: Due to the independence of each data source, Airflow’s ELT task parallelism characteristics allow for the simultaneous analysis of data from several sources, maximizing throughput, and reducing processing times.
- Fault Tolerance with Backup DAGs: Airflow backup DAGs were implemented to guarantee data pipeline continuity and fault tolerance. In the event of disruptions or failures, these backup DAGs might immediately resume operations, reducing downtime and data loss
- File Cleanup and Maintenance: Regular file cleanup and maintenance were done using Integrated Airflow activities to maximize storage resources and maintain data hygiene. This required making sure that any unnecessary or remaining files (CSV) from data processing tasks performed during ELT were removed as soon as possible.
2. Google BigQuery
- Columnar Database: LinkedIn has more than 190 data sources, and certain social media data sources, like Facebook, contain more than 500 data fields. BigQuery uses columnar storage structure to maximize query performance. This helps in the adtech sector where instant insights are crucial by enabling the rapid retrieval and analysis of particular columns within enormous datasets.
- Database partitioning and clustering mechanism: It is used to separate the campaign database by date.
- Scheduling and storing options for SQL queries.
3. GCP Composer
- Dynamic Configuration Management: Utilized the adaptability of Cloud Composer to quickly set up and modify Apache Airflow configurations, Python package dependencies (PyPI libraries), and environment variables as needed. This made it possible to adjust to changing project requirements without experiencing any downtime.
- Auto Scaling: Utilized the Cloud Composer’s adaptability to swiftly set up and adjust Apache Airflow configurations, Python package dependencies (PyPI libraries), and environment variables as needed. This made it possible to adjust to changing project requirements without experiencing any downtime.
- Worker, Scheduler, and Webserver Configuration: Cloud Composer was used to configure and maintain Airflow components, including workers for task execution, schedulers for job scheduling, and the webserver for user interface interaction. This streamlined management has expedited deployment and maintenance tasks.
- Trigger Configuration:Created trigger settings in Cloud Composer to automate process execution based on predefined events or circumstances. This eliminated the need for manual intervention while enhancing workflow automation and efficiency.
4. dbt
It will be covered in the series’s final part.
Top Level System Design
Setting Up Apache Airflow
Prepare the Environment
- Managing dependencies in a virtual environment is recommended.
Python3 -m venv airflow_venv
source airflow_venv/bin/activate
Install Airflow
- Use pip to install Apache Airflow. It’s critical to specify the version and include any required additions.
pip install apache-airflow==2.9.2
Initial Configuration
Initialize Database
- Airflow stores metadata in a database. Utilize the following command to initialize the database.:
- airflow db migrate
airflow db migrate
Create user
- Create an Airflow user to access the web UI
airflow users create --username admin --firstname FIRST_NAME --lastname LAST_NAME --role Admin --email admin@example.com
Spin Up Server
- Launch the scheduler and web server for Airflow in a different terminal window.
airflow webserver –port 8080
airflow scheduler
Configure Airflow
Airflow Configuration file
- The main configuration file is airflow.cfg, in the Airflow home directory (~/airflow by default). Some important settings to configure include:
- DAGs folder: The directory where your DAG files(pipelines) are stored
- dags_folder = /path/to/your/dags
- Executor: The type of executor (e.g., SequentialExecutor, LocalExecutor, CeleryExecutor)
- executor = LocalExecutor # parallel execution
- DB connection: The database where airflow saves all it’s metadata
- sql_alchemy_conn = postgresql+psycopg2://username:password@localhost/database_name
- DAGs folder: The directory where your DAG files(pipelines) are stored
Project Directory Structure
- dags
- windsor_data_reddit_composer.py
- windsor_data_tiktok_composer.py
- windsor_data_snapchat_composer.py
- windsor_data_linkedin_composer.py
- windsor_data_facebook_composer.py
- schema_data/
- campaign_data/
- reddit/
- reddit_data.csv (temp)
- tiktok/
- tiktok_data.csv (temp)
- snapchat/
- snapchat_data.csv (temp)
- linkedin/
- linkedin_data.csv (temp)
- fb/
- facebook_data.csv (temp)
- reddit/
- table_schema/
- tiktok_schema,json
- snapchat_schema.json
- reddit_schema.json
- facebook_schema.json
- linkedin_schema.json
- campaign_data/
BigQuery Schema Design
To store data, a schema needs to be developed beforehand. For example, let’s examine the Snapchat schema. It’s available in bucket → dags → schema_data → table_schema → snapchat_schema.json:
[
{"name": "row_id", "type": "STRING", "mode": "REQUIRED", "description": "row unique id", "default_value_expression": "GENERATE_UUID()"},
{"name": "account_id", "type": "STRING", "mode": "NULLABLE", "description": ""},
{"name": "account_name", "type": "STRING", "mode": "NULLABLE", "description": ""},
{"name": "campaign", "type": "STRING", "mode": "NULLABLE", "description": ""},
{"name": "clicks", "type": "FLOAT64", "mode": "NULLABLE", "description": ""},
{"name": "spend", "type": "FLOAT64", "mode": "NULLABLE", "description": ""},
{"name": "impressions", "type": "FLOAT64", "mode": "NULLABLE", "description": ""},
{"name": "date", "type": "DATE", "mode": "NULLABLE", "description": ""},
{"name": "source", "type": "STRING", "mode": "NULLABLE", "description": ""},
{"name": "ad_id", "type": "STRING", "mode": "NULLABLE", "description": ""},
{"name": "ad_name", "type": "STRING", "mode": "NULLABLE", "description": ""},
{"name": "campaign_id", "type": "STRING", "mode": "NULLABLE", "description": ""},
{"name": "conversion_ad_click", "type": "FLOAT64", "mode": "NULLABLE", "description": ""},
{"name": "quartile_1", "type": "FLOAT64", "mode": "NULLABLE", "description": ""}
]
- row_id : field created automatically. That’s the unique key for the row; the other fields are pulled from the API response based on what the company requires.
BigQuery Database Design
- Raw Data
- Dataset name: campaign
- Table name
- tiktok_campaign
- snapchat_campaign
- reddit_campaign
- linkedin_campaign
- facebook_campaign
- Table name
- Dataset name: campaign
- Processed Data
- Dataset name: campaign_analysis
- Table name
- src_fb_campaign
- src_linkedin_campaign
- src_tiktok_campaign
- src_reddit_campaign
- Src_snapchat_campaign
- Table name
- Dataset name: campaign_analysis
- Dimension Data
- Dataset name: campaign_analysis_dim
- Table name
- dim_all_campaign
- dim_linkedin_campaign
- dim_tiktok_campaign
- dim_reddit_campaign
- dim_snapchat_campaign
- dim_fb_campaign
- Table name
- Dataset name: campaign_analysis_dim
- Fact Data
- Dataset name: campaign_analysis_fact
- Table name
- fact_fb_campaign
- fact_linkedin_campaign
- fact_tiktok_campaign
- fact_reddit_campaign
- fact_snapchat_campaign
- Table name
- Dataset name: campaign_analysis_fact
Create DAG: The first Data pipeline
dags → windsor_data_snapchat_composer.py
Create Connection
- Create 2 connections: Airflow ui → admin → Connections → click + button to create new connection
- Gcp connection
- windsor_api_available
Create Variable
- Create 4 variables: Airflow ui → admin → Variables → click + button to create new variable
- Gcp project name
- Gcp project dataset name
- Windsor api
- Windsor api key
Code Implementation (Sequentially)
- Import libraries
import os
import json
import requests
import pandas as pd
import traceback
from datetime import date, timedelta, datetime
from google.oauth2 import service_account
from google.cloud import bigquery
from airflow.models import DAG
from airflow.utils.task_group import TaskGroup
from airflow.providers.http.sensors.http import HttpSensor
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator,\
BigQueryCreateEmptyTableOperator
from airflow.operators.bash import BashOperator
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.edgemodifier import Label
- Get the dir path
dag_bucket = Variable.get('dag_bucket')
weselect_data_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'weSelect')
tiktok_dir = f'gs://{dag_bucket}/dags/weSelect/campaign_data/tiktok/'
yesterday = date.today() - timedelta(days = 1) # for last day data capture from windsor API
- Set the pipeline configuration
default_args = {
'owner': 'weSelect_admin',
'email': [‘user_mail@example.com‘], ‘retries’: 5, ‘retry_delay’: timedelta(seconds=20) ],
'retries': 5,
'retry_delay': timedelta(seconds=120)
}
- DAG init
with DAG(dag_id='weSelect_windsor_tiktok_campaign_data_composer'
, tags=['gcp_bigquery', 'windsor', 'tiktok', 'campaign']
, default_args=default_args
, description=f'fetch all tiktok campaign data from Windsor API if available and save the {yesterday} data int GCP BigQuery'
, start_date=datetime(2024, 2, 24)
, schedule='0 5 * * *' # 5 AM
, catchup=False):
- Tasks
- Check API availability
# Tasks
# =====
#1. check API availability
is_windsor_tiktok_api_available = HttpSensor(task_id='is_windsor_api_available'
, http_conn_id='windsor_api_available'
, endpoint=f'tiktok?api_key={Variable.get("windsor_api_key")}&fields=campaign&date_from={yesterday}&date_to={yesterday}'
, response_check=lambda response: True if response.status_code == 200 else False
, method='GET'
, mode='poke'
, poke_interval=5
, timeout=60
)
- Fetch campaign data task
#2. fetch campaign data
check_api_data = BranchPythonOperator(task_id='check_api_data'
, python_callable=_check_api_data
, trigger_rule=TriggerRule.ALL_SUCCESS
)
- Empty data task to show in case of no data available while no data present in fetching data task
#3. empty data.
empty_data = PythonOperator(task_id='empty_data'
, python_callable=lambda: print(f"No data is available in API for {yesterday}")
)
- Dump campaign data
#4. dump campaign data
dump_campaign_data = PythonOperator(task_id='dump_campaign_data'
, python_callable=_dump_campaign_data
)
- Create BigQuery dataset and table
#5. create BigQuery dataset and table
with TaskGroup(group_id='create_bigquery_dataset_table'
, tooltip='create campaign dataset and tiktok_campaign table') as create_bigquery_dataset_table:
#5.1 create dataset
create_campaign_data_dataset = BigQueryCreateEmptyDatasetOperator(task_id='create_campaign_data_dataset',
gcp_conn_id='weselect_bigquery_gcp_conn',
project_id='{{ var.value.gcp_weselect_project}}',
dataset_id='{{ var.value.gcp_welect_bigquery_campaign_dataset}}',
if_exists='ignore'
)
#5.2 create table
with open(f'{weselect_data_dir}/table_schema/tiktok_schema.json') as f: # tiktok_campaign table schema
schema = json.load(f)
create_tiktok_campaign_table = BigQueryCreateEmptyTableOperator(task_id='create_tiktok_campaign_table',
gcp_conn_id='weselect_bigquery_gcp_conn',
project_id='{{ var.value.gcp_weselect_project}}',
dataset_id='{{ var.value.gcp_welect_bigquery_campaign_dataset}}',
table_id='tiktok_campaign',
if_exists='ignore',
schema_fields=schema,
time_partitioning={
"type": "YEAR",
"field": "date"
}
)
# dataset and table taskgroup taskflow
create_campaign_data_dataset >> create_tiktok_campaign_table
- Insert data into BigQuery
# #6. insert data into BigQuery
upload_data = PythonOperator(task_id='upload_data'
, python_callable=upload_data_from_csv)
- Taskflow
# Taskflow
[create_bigquery_dataset_table]
is_windsor_tiktok_api_available \
>> Label('check windsor tiktok API is available') >> check_api_data
check_api_data >> Label('data unavaiable') >> empty_data
check_api_data >> Label('data available') >> dump_campaign_data \
>> Label('export API data as csv') >> upload_data \
>> Label('append data to BigQuery')
- Helper functions
- Fetch API data
def _check_api_data(ti):
"""
check windsor tiktok api data is available or not
"""
api_endpoint = f"{Variable.get('windsor_api')}/tiktok?api_key={Variable.get('windsor_api_key')}&fields=ctr%2Caccount_id%2Caccount_name%2Cad_name%2Ccampaign%2Ccampaign_name%2Ccampaign_id%2Cclicks%2Cdate%2Cduet_clicks%2Cimpressions%2Cplay_first_quartile%2Creach%2Csound_usage_clicks%2Cspend%2Cstitch_clicks%2Cad_id&date_from={yesterday}&date_to={yesterday}"
response = requests.get(api_endpoint)
data = json.loads(response.content)['data']
print(f"data volume: {len(data)}")
if len(data) > 0: # if data is available then export data as csv
ti.xcom_push(key='campaign_data', value=data)
return 'dump_campaign_data'
else: # otherwise, skip
return 'empty_data'
- Export API data as csv
def _dump_campaign_data(ti):
"""
export campaign data as csv
"""
campaign_data = ti.xcom_pull('check_api_data', key='campaign_data') # pull API data
df = pd.DataFrame(campaign_data) # convert to pandas dataframe
file_name = f'campaign_data_{datetime.now()}.csv'
print(f"total campaign data: {len(campaign_data)}")
print('Data is exported as CSV successfully.')
df.to_csv(f'{tiktok_dir}{file_name}', index=False)
- Save data to DB
def upload_data_from_csv():
"""
fetch data from csv and upload to bigQuery table
"""
bgquery_client = bigquery.Client() # bigquery
# create a job
job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV
, skip_leading_rows=1
, autodetect=False
, write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)
try:
df = pd.read_csv(f'{tiktok_dir}*.csv') # tiktok dir
df = df.where(pd.notnull(df), None) # convert pandas NaN to SQL null
print(f"data: {df.head(5)}")
job = bgquery_client.load_table_from_dataframe(df
, f"{Variable.get('gcp_weselect_project')}.{Variable.get('gcp_welect_bigquery_campaign_dataset')}.tiktok_campaign"
, job_config=job_config)
job.result() # Waits for the job to complete
print(f"Data insertion is successful in tiktok_campaign table.")
except Exception:
print(f"Something is wrong with csv_file file and tiktok_campaign table.")
print(f"Error: {traceback.print_exc()}")
Full-code Version
"""
Variables are used in this script:
==================================
1. windsor_api_key
2. windsor_api
3. gcp_weselect_project
4. gcp_welect_bigquery_campaign_dataset
Connections are used in this script:
==================================
1. windsor_api_available
2. weselect_bigquery_gcp_conn
"""
import os
import json
import requests
import pandas as pd
import traceback
from datetime import date, timedelta, datetime
from google.oauth2 import service_account
from google.cloud import bigquery
from airflow.models import DAG
from airflow.utils.task_group import TaskGroup
from airflow.providers.http.sensors.http import HttpSensor
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator,\
BigQueryCreateEmptyTableOperator
from airflow.operators.bash import BashOperator
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.edgemodifier import Label
dag_bucket = Variable.get('dag_bucket')
weselect_data_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'weSelect')
tiktok_dir = f'gs://{dag_bucket}/dags/weSelect/campaign_data/tiktok/'
yesterday = date.today() - timedelta(days = 1) # for last day data capture from windsor API
default_args = {
'owner': 'weSelect_admin',
'email': [‘user_mail@example.com‘], ‘retries’: 5, ‘retry_delay’: timedelta(seconds=20) ],
'retries': 5,
'retry_delay': timedelta(seconds=120)
}
def _check_api_data(ti):
"""
check windsor tiktok api data is available or not
"""
api_endpoint = f"{Variable.get('windsor_api')}/tiktok?api_key={Variable.get('windsor_api_key')}&fields=ctr%2Caccount_id%2Caccount_name%2Cad_name%2Ccampaign%2Ccampaign_name%2Ccampaign_id%2Cclicks%2Cdate%2Cduet_clicks%2Cimpressions%2Cplay_first_quartile%2Creach%2Csound_usage_clicks%2Cspend%2Cstitch_clicks%2Cad_id&date_from={yesterday}&date_to={yesterday}"
response = requests.get(api_endpoint)
data = json.loads(response.content)['data']
print(f"data volume: {len(data)}")
if len(data) > 0: # if data is available then export data as csv
ti.xcom_push(key='campaign_data', value=data)
return 'dump_campaign_data'
else: # otherwise, skip
return 'empty_data'
def _dump_campaign_data(ti):
"""
export campaign data as csv
"""
campaign_data = ti.xcom_pull('check_api_data', key='campaign_data') # pull API data
df = pd.DataFrame(campaign_data) # convert to pandas dataframe
file_name = f'campaign_data_{datetime.now()}.csv'
print(f"total campaign data: {len(campaign_data)}")
print('Data is exported as CSV successfully.')
df.to_csv(f'{tiktok_dir}{file_name}', index=False)
def upload_data_from_csv():
"""
fetch data from csv and upload to bigQuery table
"""
bgquery_client = bigquery.Client() # bigquery
# create a job
job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV
, skip_leading_rows=1
, autodetect=False
, write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)
try:
df = pd.read_csv(f'{tiktok_dir}*.csv') # tiktok dir
df = df.where(pd.notnull(df), None) # convert pandas NaN to SQL null
print(f"data: {df.head(5)}")
job = bgquery_client.load_table_from_dataframe(df
, f"{Variable.get('gcp_weselect_project')}.{Variable.get('gcp_welect_bigquery_campaign_dataset')}.tiktok_campaign"
, job_config=job_config)
job.result() # Waits for the job to complete
print(f"Data insertion is successful in tiktok_campaign table.")
except Exception:
print(f"Something is wrong with csv_file file and tiktok_campaign table.")
print(f"Error: {traceback.print_exc()}")
with DAG(dag_id='weSelect_windsor_tiktok_campaign_data_composer'
, tags=['gcp_bigquery', 'windsor', 'tiktok', 'campaign']
, default_args=default_args
, description=f'fetch all tiktok campaign data from Windsor API if available and save the {yesterday} data int GCP BigQuery'
, start_date=datetime(2024, 2, 24)
, schedule='0 5 * * *' # 5 AM
, catchup=False):
# Tasks
# =====
#1. check API availability
is_windsor_tiktok_api_available = HttpSensor(task_id='is_windsor_api_available'
, http_conn_id='windsor_api_available'
, endpoint=f'tiktok?api_key={Variable.get("windsor_api_key")}&fields=campaign&date_from={yesterday}&date_to={yesterday}'
, response_check=lambda response: True if response.status_code == 200 else False
, method='GET'
, mode='poke'
, poke_interval=5
, timeout=60
)
#2. fetch campaign data
check_api_data = BranchPythonOperator(task_id='check_api_data'
, python_callable=_check_api_data
, trigger_rule=TriggerRule.ALL_SUCCESS
)
#3. empty data
empty_data = PythonOperator(task_id='empty_data'
, python_callable=lambda: print(f"No data is available in API for {yesterday}")
)
#4. dump campaign data
dump_campaign_data = PythonOperator(task_id='dump_campaign_data'
, python_callable=_dump_campaign_data
)
#5. create BigQuery dataset and table
with TaskGroup(group_id='create_bigquery_dataset_table'
, tooltip='create campaign dataset and tiktok_campaign table') as create_bigquery_dataset_table:
#5.1 create dataset
create_campaign_data_dataset = BigQueryCreateEmptyDatasetOperator(task_id='create_campaign_data_dataset',
gcp_conn_id='weselect_bigquery_gcp_conn',
project_id='{{ var.value.gcp_weselect_project}}',
dataset_id='{{ var.value.gcp_welect_bigquery_campaign_dataset}}',
if_exists='ignore'
)
#5.2 create table
with open(f'{weselect_data_dir}/table_schema/tiktok_schema.json') as f: # tiktok_campaign table schema
schema = json.load(f)
create_tiktok_campaign_table = BigQueryCreateEmptyTableOperator(task_id='create_tiktok_campaign_table',
gcp_conn_id='weselect_bigquery_gcp_conn',
project_id='{{ var.value.gcp_weselect_project}}',
dataset_id='{{ var.value.gcp_welect_bigquery_campaign_dataset}}',
table_id='tiktok_campaign',
if_exists='ignore',
schema_fields=schema,
time_partitioning={
"type": "YEAR",
"field": "date"
}
)
# dataset and table taskgroup taskflow
create_campaign_data_dataset >> create_tiktok_campaign_table
# #6. insert data into BigQuery
upload_data = PythonOperator(task_id='upload_data'
, python_callable=upload_data_from_csv)
# Taskflow
[create_bigquery_dataset_table]
is_windsor_tiktok_api_available \
>> Label('check windsor tiktok API is available') >> check_api_data
check_api_data >> Label('data unavaiable') >> empty_data
check_api_data >> Label('data available') >> dump_campaign_data \
>> Label('export API data as csv') >> upload_data \
>> Label('append data to BigQuery')
You may efficiently orchestrate data pipelines both locally and in a cloud environment such as Google Cloud Composer by setting up, configuring, and deploying Apache Airflow by following these comprehensive steps. Learn more about data pipelines with this guide on Apache Airflow.
At VivaSoft Limited, we leverage advanced data engineering technologies like Apache Airflow and Google BigQuery to optimize and streamline your data processes. Contact us to see how VivaSoft Limited can revolutionize your data management practices.