Introduction
Effective workflow automation is essential for managing data pipelines in the data-driven world of today. Teams can now easily schedule and track jobs with Apache Airflow, a prominent solution for orchestrating complicated data operations. But effective monitoring and prompt notifications are critical to the success of these operations.
This blog post describes how to set up a notification system to notify Microsoft Teams in real time when an Apache Airflow pipeline completes its execution. Teams can improve communication and promptly resolve problems by including Microsoft Teams notifications for both successful completions and errors.
Automate Microsoft Teams Alerts with Apache Airflow
This blog post describes how to set up a notification system to notify Microsoft Teams in real time when an Apache Airflow pipeline completes its execution. Teams can improve communication and promptly resolve problems by including Microsoft Teams notifications for both successful completions and errors.
Key Points
- Setting Up Microsoft Teams
- Implementing Notification Functionality
- Integrating Notifications in Airflow DAGs.
Technology Used
Apache Airflow, Ms Teams Webhook
Benefits of Automating Databricks Jobs with Airflow
Your data engineering activities can be made much more reliable, scalable, and efficient by utilizing the combined power of Apache Airflow and Databricks.
- Real-Time Notifications
- Enhanced Collaboration
- Centralized Monitoring
- Customizable Alerts
- Improved Accountability
- Streamlined Workflow Management
- Integration with Other Tools
- Reduced Notification Fatigue
- Enhanced Incident Response
Top level workflow
Fig 1: MS Teams notifications using Apache Airflow
Setting Up the Environment
- Prerequisites
- MS Teams Channel
- Apache Airflow Installed
- MS Teams Channel Webhook
- Configurations
- Setting Up MS Teams Webhook
- Login to MS Teams and click Teams from left sidebar
- Create a new channel clicking the 3 dot button
- Navigate to the newly created channel
- Click the 3 dot and select Manage Channel
- Goto Settings from top navbar
- Click Edit under connectors
- Search by Incoming Webhook and click Configure
- Set a name and click Create
- Copy the webhook link and save it in Airflow variable
- Done
- Setting Up MS Teams Webhook
- Prerequisites
- Project Structure
├──dags
| └──utility
| └──ms_teams_notifications.py
├──example_dag.py
Function for MS Teams Success Notification
import requests
from airflow.models import Variable
def send_success_notification_teams_message(context)-> None:
"""
Send success notification to Microsoft teams.
"""
task_instance = context.get('task_instance')
task_id = task_instance.task_id
dag_id = task_instance.dag_id
execution_date = context.get('logical_date') # as execution_date is deprecated in new version
log_url = context.get('task_instance').log_url
payload = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"summary": "Summary",
"themeColor": "00FF00", # Green color for success
"sections": [{
"activityTitle": "GCP Composer Service",
"activitySubtitle": "Airflow Notification",
"facts": [
{
"name": "Airflow Task Success",
"value": f"Task: {task_id}, DAG: {dag_id}.\n Execution Date: {execution_date} \n Log URL: {log_url}"
}
],
"text": "Have a Good Day 🙂"
}],
"potentialAction": [{
"@type": "OpenUri",
"name": "Link name",
"targets": [{
"os": "default",
"uri": "https://www.google.com/"
}]
}]
}
headers = {"content-type": "application/json"}
try:
requests.post(Variable.get("ms_teams_webhook"),
json=payload,
headers=headers)
print(f"Notification is sent successfully")
except Exception as e:
print(f"Notification error: {str(e)}")
Function for MS Teams Failure Notification
import requests
from airflow.models import Variable
def send_fail_notification_teams_message(context)-> None:
"""
Send fail notification to Microsoft teams.
"""
task_instance = context.get('task_instance')
task_id = task_instance.task_id
dag_id = task_instance.dag_id
# execution_date = context.get('execution_date')
execution_date = context.get('logical_date') # as execution_date is deprecated
log_url = context.get('task_instance').log_url
payload = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"summary": "Summary",
"themeColor": "FF0000", # Red color for failure
"sections": [{
"activityTitle": "GCP Composer Service",
"activitySubtitle": "Airflow Notification",
"facts": [
{
"name": "Airflow Task Failed",
"value": f"Task: {task_id}, DAG: {dag_id}.\n Execution Date: {execution_date} \n Log URL: {log_url}"
}
],
"text": "Have a Good Day 🙂"
}],
"potentialAction": [{
"@type": "OpenUri",
"name": "Link name",
"targets": [{
"os": "default",
"uri": "https://www.google.com/"
}]
}]
}
headers = {"content-type": "application/json"}
try:
requests.post(Variable.get("ms_teams_webhook"),
json=payload,
headers=headers)
print(f"Notification is sent successfully")
except Exception as e:
print(f"Notification error: {str(e)}")
Configuration in the Airflow DAG
from airflow.models import Variable
from utility.ms_teams_notification import send_fail_notification_teams_message, \
send_success_notification_teams_message
default_args = {
'owner' : 'admin',
'email_on_failure': True,
'email_on_retry': False,
'retries': NO_OF_RETRY, # from Airflow variable
'retry_delay': timedelta(seconds=TASK_RETRY_DELAY_IN_SECONDS), # Airflow variable
'on_failure_callback': send_fail_notification_teams_message,
'on_success_callback': send_success_notification_teams_message
}
Conclusion
An effective approach to improve workflow monitoring and team collaboration is to integrate Apache Airflow with Microsoft Teams webhooks for success and error notifications.
You can guarantee that your team is informed right away of any problems or successes in your data pipelines by setting up real-time alerts, which will allow for quicker reaction times and more effective operations.
This integration encourages improved teamwork and responsibility in addition to centralizing alerts on a well-known platform. Personalized alerts give each notification the essential context, enabling your team to respond to problems clearly and precisely.
You may improve the reliability and robustness of your data pipeline infrastructure by reducing manual oversight, streamlining your monitoring operations, and integrating Microsoft Teams notifications into your Airflow workflows. Accept this integration to maintain team cohesion and efficient data operations.