Automate Microsoft Teams Alerts with Apache Airflow

Last Update: October 1, 2024
Automate Microsoft Teams Alerts with Apache Airflow
Table of Contents
Contributors
Picture of Vivasoft Team
Vivasoft Team
Tech Stack
0 +
Want to accelerate your software development company?

It has become a prerequisite for companies to develop custom software products to stay competitive.

Introduction

Effective workflow automation is essential for managing data pipelines in the data-driven world of today. Teams can now easily schedule and track jobs with Apache Airflow, a prominent solution for orchestrating complicated data operations. But effective monitoring and prompt notifications are critical to the success of these operations.
This blog post describes how to set up a notification system to notify Microsoft Teams in real time when an Apache Airflow pipeline completes its execution. Teams can improve communication and promptly resolve problems by including Microsoft Teams notifications for both successful completions and errors.

Automate Microsoft Teams Alerts with Apache Airflow

This blog post describes how to set up a notification system to notify Microsoft Teams in real time when an Apache Airflow pipeline completes its execution. Teams can improve communication and promptly resolve problems by including Microsoft Teams notifications for both successful completions and errors.

Key Points

  • Setting Up Microsoft Teams
  • Implementing Notification Functionality
  • Integrating Notifications in Airflow DAGs.

Technology Used

Apache Airflow, Ms Teams Webhook

Benefits of Automating Databricks Jobs with Airflow

Your data engineering activities can be made much more reliable, scalable, and efficient by utilizing the combined power of Apache Airflow and Databricks.

  1. Real-Time Notifications
  2. Enhanced Collaboration
  3. Centralized Monitoring
  4. Customizable Alerts
  5. Improved Accountability
  6. Streamlined Workflow Management
  7. Integration with Other Tools
  8. Reduced Notification Fatigue
  9. Enhanced Incident Response

Top level workflow

Fig 1: MS Teams notifications using Apache Airflow

Setting Up the Environment

    • Prerequisites
      • MS Teams Channel
      • Apache Airflow Installed
      • MS Teams Channel Webhook

    • Configurations
      • Setting Up MS Teams Webhook
        • Login to MS Teams and click Teams from left sidebar
        • Create a new channel clicking the 3 dot button
        • Navigate to the newly created channel
          • Click the 3 dot and select Manage Channel
          • Goto Settings from top navbar
          • Click Edit under connectors
          • Search by Incoming Webhook and click Configure
          • Set a name and click Create 
          • Copy the webhook link and save it in Airflow variable
          • Done
  • Project Structure
    ├──dags
    | └──utility
    | └──ms_teams_notifications.py
    ├──example_dag.py

Function for MS Teams Success Notification

				
					import requests
from airflow.models import Variable
    
def send_success_notification_teams_message(context)-> None:
    """
    Send success notification to Microsoft teams.
    """
    
    task_instance = context.get('task_instance')
    task_id = task_instance.task_id
    dag_id = task_instance.dag_id
    execution_date = context.get('logical_date')  # as execution_date is deprecated in new version
    log_url = context.get('task_instance').log_url


    payload = {
        "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "summary": "Summary",
        "themeColor": "00FF00",  # Green color for success
        "sections": [{
        "activityTitle": "GCP Composer Service",
        "activitySubtitle": "Airflow Notification",
        "facts": [
            {
                "name": "Airflow Task Success",
                "value": f"Task: {task_id}, DAG: {dag_id}.\n Execution Date: {execution_date} \n Log URL: {log_url}"
            }
        ],
        "text": "Have a Good Day 🙂"
        }],
        "potentialAction": [{
        "@type": "OpenUri",
        "name": "Link name",
        "targets": [{
            "os": "default",
            "uri": "https://www.google.com/"
        }]
        }]
    }
    headers = {"content-type": "application/json"}
    try:
        requests.post(Variable.get("ms_teams_webhook"), 
                      json=payload, 
                      headers=headers)
        print(f"Notification is sent successfully")
    except Exception as e:
        print(f"Notification error: {str(e)}")

				
			

Function for MS Teams Failure Notification

				
					import requests
from airflow.models import Variable


def send_fail_notification_teams_message(context)-> None:
    """
    Send fail notification to Microsoft teams.
    """
    
    task_instance = context.get('task_instance')
    task_id = task_instance.task_id
    dag_id = task_instance.dag_id
    # execution_date = context.get('execution_date')
    execution_date = context.get('logical_date')  # as execution_date is deprecated
    log_url = context.get('task_instance').log_url


    payload = {
        "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "summary": "Summary",
        "themeColor": "FF0000",  # Red color for failure
        "sections": [{
        "activityTitle": "GCP Composer Service",
        "activitySubtitle": "Airflow Notification",
        "facts": [
            {
                "name": "Airflow Task Failed",
                "value": f"Task: {task_id}, DAG: {dag_id}.\n Execution Date: {execution_date} \n Log URL: {log_url}"
            }
        ],
        "text": "Have a Good Day 🙂"
        }],
        "potentialAction": [{
        "@type": "OpenUri",
        "name": "Link name",
        "targets": [{
            "os": "default",
            "uri": "https://www.google.com/"
        }]
        }]
    }
    headers = {"content-type": "application/json"}
    try:
        requests.post(Variable.get("ms_teams_webhook"), 
                      json=payload, 
                      headers=headers)
        print(f"Notification is sent successfully")
    except Exception as e:
        print(f"Notification error: {str(e)}")

				
			

Configuration in the Airflow DAG

				
					from airflow.models import Variable
from utility.ms_teams_notification import send_fail_notification_teams_message, \
    send_success_notification_teams_message


default_args = {
    'owner' : 'admin',
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': NO_OF_RETRY,  # from Airflow variable
    'retry_delay': timedelta(seconds=TASK_RETRY_DELAY_IN_SECONDS), # Airflow variable
    'on_failure_callback': send_fail_notification_teams_message,
    'on_success_callback': send_success_notification_teams_message
}

				
			

Conclusion

An effective approach to improve workflow monitoring and team collaboration is to integrate Apache Airflow with Microsoft Teams webhooks for success and error notifications.

You can guarantee that your team is informed right away of any problems or successes in your data pipelines by setting up real-time alerts, which will allow for quicker reaction times and more effective operations.


This integration encourages improved teamwork and responsibility in addition to centralizing alerts on a well-known platform. Personalized alerts give each notification the essential context, enabling your team to respond to problems clearly and precisely.

You may improve the reliability and robustness of your data pipeline infrastructure by reducing manual oversight, streamlining your monitoring operations, and integrating Microsoft Teams notifications into your Airflow workflows. Accept this integration to maintain team cohesion and efficient data operations.

Potential Developer
Tech Stack
0 +
Accelerate Your Software Development Potential with Us
With our innovative solutions and dedicated expertise, success is a guaranteed outcome. Let's accelerate together towards your goals and beyond.
Blogs You May Love

Don’t let understaffing hold you back. Maximize your team’s performance and reach your business goals with the best IT Staff Augmentation