Azure Data Factory Pipeline Trigger

Last Update: September 27, 2024
Azure data factory
Table of Contents
Contributors
Picture of Vivasoft Team
Vivasoft Team
Tech Stack
0 +
Want to accelerate your software development company?

It has become a prerequisite for companies to develop custom software products to stay competitive.

Azure Data Factory (ADF) is an extensively managed, serverless data integration solution designed to handle the ingestion, preparation, and transformation of large volumes of data. Within an Azure subscription, it is possible to possess one or multiple instances, referred to as data factories, of Azure Data Factory.

Azure Data Factory encompasses several fundamental elements, one of which is the Pipeline. In Azure Data Factory and Azure Synapse, an occurrence of executing a pipeline is referred to as a pipeline run.

Pipeline runs are typically initiated by providing arguments to the parameters defined within the pipeline. You can trigger the execution of a pipeline either manually or by utilizing a trigger mechanism.

This article furnishes comprehensive information regarding both methods of executing a pipeline. In this instance, we have a list of pipelines obtained from the Azure API, and each name has the capability to be executed. When the “run” button is clicked, changes through several stages.

Initially, it displays as “InProgress,” then transitions to “Queued,” and finally shows as “Succeeded.” However, there is also a possibility for it to exhibit the “Failed” status. In this example, we use a Web MVC application.

Azure data factory pipeline trigger flow diagram
Pipeline Trigger Flow Diagram

Now, let’s discuss the underlying logic used to initiate the pipeline and update its status. Prior to triggering the pipeline, we retrieve all the pipeline names from the Azure API within the controller.

In this case, all the data will be populated within the Pipeline entity to be display in the grid.

Prerequisites

1. .NET 8.0 (any compatible .net core version).

2. SQL Server (Or any other compatible database).

3. Visual Studio or Visual Studio Code.

Create a Web API project

Create a web API project name as AzureDataFactoryPipeline with many steps. All the steps below here:

Step 1:

Give a project name as AzureDataFactoryPipeline
Give A Project Name As AzureDataFactoryPipeline

Step 2:

Select framework and enable Use controllers and swagger OpenAPI support
Select Framework and Enable Use Controllers and Swagger OpenAPI Support

Step 3:

After created solution AzureDataFactoryPipeline
After Created Solution AzureDataFactoryPipeline

Add Necessary NuGet Package

Some necessary Nuget package need to install. After installed all the Nuget package then my application After installing the necessary NuGet packages, all the packages were displayed in my application’s .csproj file.

				
					<Project Sdk="Microsoft.NET.Sdk.Web">
  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <Nullable>enable</Nullable>
    <ImplicitUsings>enable</ImplicitUsings>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Azure.ResourceManager.DataFactory" Version="1.3.0" />
    <PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
    <PackageReference Include="RestSharp" Version="111.4.1" />
    <PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
  </ItemGroup>
</Project>
				
			

Configure AppSetting.json

Make sure to configure all your pipeline configure credentials in the appsettings.json file. In the appsettings.json file, include the following information:

				
					"PipelineConfigure": {
    "TenantId": "...",
    "ClientId": "...",
    "ClientSecret": "..",
    "FactoryName": "...",
    "ResourceGroupName": "..",
    "SubscriptionId": "..",
    "GrantType": "..",
    "Resource": "..",
},
				
			

To retrieve JSON values from appsettings and appsettings. it is necessary to create a class property. Use the following syntax to define the property:

				
					public class PipelineConfigure{
    public string TenantId { get; set; }
    public string ClientId { get; set; }
    public string ClientSecret { get; set; }
    public string FactoryName { get; set; }
    public string ResourceGroupName { get; set; }
    public string SubscriptionId { get; set; }
    public string GrantType { get; set; }
    public string Resource { get; set; }
}

				
			

Next, let’s register the application services. Below is the corresponding configuration:

				
					Builder.Services.AddSingleton(_configuration.GetSection(nameof(PipelineConfigure)).Get<PipelineConfigure>());
				
			

Create A Service Utility Class

To execute an Azure Pipeline using the Azure Data Factory REST API, we need to generate a token, then we need to fetch pipelines, then we need get Runnable Id given by pipeline then we make a request to start pipeline.

Those can be achieved by using an HTTP request with RestSharp and Azure resource manager data factory package.

So for this service class we need to create some DTOs. Here all of my DTOs below.

				
					public class PipelineParam
{
    public string SubscriptionId { get; set; }
    public string ResourceGroupName { get; set; }
    public string FactoryName { get; set; }
    public string Version { get; set; }
    public string Token { get; set; }
}
public class PipelineResponse
{
    public List<Value> Value { get; set; }
}
public class Value
{
    public string Id { get; set; }
    public string Name { get; set; }
    public string Type { get; set; }
    public dynamic Properties { get; set; }
    public string Etag { get; set; }
}
public class CreateRunResponse
{
    public string RunId { get; set; }
}
public class TokenResponse
{
    public string token_type { get; set; }
    public int expires_in { get; set; }
    public int ext_expires_in { get; set; }
    public string access_token { get; set; }
}
				
			

For managing token for request azure resource manage. I implement a custom token handler service. Which is below here:

				
					public class CustomTokenCredential : TokenCredential
{
    private readonly string _token;

    public CustomTokenCredential(string token)
    {
        _token = token;
    }

    public override AccessToken GetToken(TokenRequestContext requestContext, CancellationToken cancellationToken)
    {
        return new AccessToken(_token, DateTimeOffset.MaxValue);
    }

    public override async ValueTask<AccessToken> GetTokenAsync(TokenRequestContext requestContext, CancellationToken cancellationToken)
    {
        return await Task.FromResult(new AccessToken(_token, DateTimeOffset.MaxValue));
    }
}

				
			

IADFService class:

				
					using Azure.ResourceManager.DataFactory;
using Azure.ResourceManager;
using AzureDataFactoryPipeline.Services.Models;
using Newtonsoft.Json;
using RestSharp;
using System.Net;

public interface IADFService
{
    Task<IEnumerable<string>> GetPipelinesAsync(PipelineParam pipelineParam);
    Task<string> GetRunIdAsync(string pipelineName,string token);
    Task<string> GetTokenAsync(string teanantId, string grant_type, string client_id, string client_secret, string resource);
    Task<string> GetStatusProcessAsync(string token, string runId);
}

public class ADFServices : IADFService
{
    private readonly ADFConfigure _config;
    public ADFServices(ADFConfigure config)
    {
        _config = config;
    }

    public async Task<IEnumerable<string>> GetPipelinesAsync(PipelineParam pipelineParam)
    {
        string base_url = "https://management.azure.com/subscriptions/";
        pipelineParam.Version = "2018-06-01";
        var client = new RestClient($"{base_url}{pipelineParam.SubscriptionId}/resourceGroups/{pipelineParam.ResourceGroupName}/providers/Microsoft.DataFactory/factories/{pipelineParam.FactoryName}/pipelines?api-version={pipelineParam.Version}");
        var request = new RestRequest();
        request.AddHeader("Content-Type", "application/json");
        request.AddHeader("Authorization", "Bearer " + pipelineParam.Token);
        var restResponse = await client.ExecuteGetAsync(request);

        if (restResponse.StatusCode == HttpStatusCode.OK)
        {
            var response = JsonConvert.DeserializeObject<PipelineResponse>(restResponse.Content!);

            if (response is null) return new List<string>();

            return response.Value.Select(x => x.Name).ToList();
        }
        return new List<string>();
    }

    public async Task<string> GetRunIdAsync(string pipelineName,string token)
    {
        var version = "2018-06-01";
        var url = $"https://management.azure.com/subscriptions/{_config.SubscriptionId}/resourceGroups/{_config.ResourceGroupName}/providers/Microsoft.DataFactory/factories/{_config.FactoryName}/pipelines/{pipelineName}/createRun?api-version={version}";
        var client = new RestClient(url);
        var request = new RestRequest();
        request.AddHeader("Content-Type", "application/json");
        request.AddHeader("Authorization", "Bearer " + token);
        var restResponse = await client.ExecutePostAsync(request);

        if (restResponse.StatusCode == HttpStatusCode.OK)
        {
            var response = JsonConvert.DeserializeObject<CreateRunResponse>(restResponse.Content!);

            if (response is null) return string.Empty;

            return response.RunId;
        }

        return string.Empty;
    }

    public async Task<string> GetTokenAsync(string teanantId, string grant_type, string client_id, string client_secret, string resource)
    {
        var url = $"https://login.microsoftonline.com/{teanantId}/oauth2/token";
        try
        {
            var client = new RestClient(url);
            var request = new RestRequest();
            request.AddHeader("Content-Type", "application/x-www-form-urlencoded");
            request.AddParameter(nameof(grant_type), grant_type);
            request.AddParameter(nameof(client_id), client_id);
            request.AddParameter(nameof(client_secret), client_secret);
            request.AddParameter(nameof(resource), resource);

            var restResponse = await client.ExecutePostAsync(request);

            if (restResponse.StatusCode == HttpStatusCode.OK)
            {
                var response = JsonConvert.DeserializeObject<TokenResponse>(restResponse.Content!);

                if (response is null) return string.Empty;

                return response.access_token;
            }
        }
        catch (Exception)
        {
            throw;
        }
        return string.Empty;
    }

    public async Task<string> GetStatusProcessAsync(string token, string runId)
    {
        var credential = new CustomTokenCredential(token);

        var armClient = new ArmClient(credential, _config.SubscriptionId);

        var resourceId = DataFactoryResource.CreateResourceIdentifier(
            _config.SubscriptionId,
            _config.ResourceGroupName,
            _config.FactoryName);

        var dataFactory = armClient.GetDataFactoryResource(resourceId);

        var pipelineRun = await dataFactory.GetPipelineRunAsync(runId);

        return pipelineRun.Value.Status;
    }
}
				
			

Create A Controller

Once a service class is created. Then we need to create a controller name is PipelineController. Let’s create a controller.

				
					[ApiController]
[Route("api/[controller]")]
public class PipelineController : ControllerBase
{
    private readonly ADFConfigure _config;
    private readonly IADFService _service;
    public PipelineController(ADFConfigure config,IADFService service)
    {
        _config = config;
        _service = service;
    }

    [HttpGet]
    public async Task<IActionResult> Get()
    {
        var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource); 

        var param = new PipelineParam
        {
            FactoryName = _config.FactoryName,
            ResourceGroupName = _config.ResourceGroupName,
            SubscriptionId = _config.SubscriptionId,
            Token = token
        };

        var pipelies = await _service.GetPipelinesAsync(param);

        return Ok(pipelies);
    }

    [HttpGet("runId",Name ="runId")]
    public async Task<IActionResult> RunId([FromQuery] string pipelineName)
    {
        var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);

        var runId = await _service.GetRunIdAsync(pipelineName, token);

        return Ok(runId);
    }

    [HttpPost("runStatus",Name ="runStatus")]
    public async Task<IActionResult> Post([FromBody] string runId)
    {
        var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);

        var runStatus = await _service.GetStatusProcessAsync(token,runId);

        return Ok(runStatus);
    }

}
				
			

Let’s demonstrate all the endpoints

Retrieve Pipelines:

Then calls to the Azure Rest API in order to obtain a token and using the pipeline credentials.

				
					var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);

var param = new PipelineParam
        {
            FactoryName = _config.FactoryName,
            ResourceGroupName = _config.ResourceGroupName,
            SubscriptionId = _config.SubscriptionId,
            Token = token
        };

var pipelies = await _service.GetPipelinesAsync(param);
				
			

After running the application, the “/api/Pipeline” endpoint looks like this:

Retrieve pipelines from /api/pipeline endpoint with a response body
Retrieve Pipelines From /API/Pipeline Endpoint With A Response Body

Response looks like this:

				
					[
	“Get Contacts”
]
				
			

Retrieve Run ID:

To trigger each pipeline, we need to obtain the Run ID first. To achieve this, we make a request using the following code. Let’s create an endpoint for receive response.

				
					[HttpGet("runId",Name ="runId")]
    public async Task<IActionResult> RunId([FromQuery] string pipelineName)
    {
        var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);

        var runId = await _service.GetRunIdAsync(pipelineName, token);

        return Ok(runId);
    }
				
			

For this endpoint we have to put a pipeline name “Get Contacts”. which we got from previous endpoint.

Retrieve Run ID from “/api/Pipeline/runID” endpoint with response body
Retrieve Run ID from “/API/Pipeline/RunID” Endpoint With Response Body

Now, we got a Run ID “c8ab422d-09d2-4b08-8948-61690dd96c7a” from Response Body.

Run pipeline and retrieve status :

Subsequently, we trigger the pipeline and retrieve the updated status of the pipeline run. This can be done by making a request using the following code:

				
					[HttpPost("runStatus",Name ="runStatus")]
    public async Task<IActionResult> Post([FromBody] string runId)
    {
        var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);

        var runStatus = await _service.GetStatusProcessAsync(token,runId);

        return Ok(runStatus);
    }
				
			

For this endpoint we have to put Run ID “c8ab422d-09d2-4b08-8948-61690dd96c7a”. Which we got from previous endpoint.

Retrieve run status from “/api/Pipeline/runStatus” endpoint with response body
Retrieve Run Status From “/API/Pipeline/RunStatus” Endpoint With Response Body

Retrieves the status of a pipeline run, which can be one of the following: Queued, InProgress, Succeeded, Failed, Canceling, or Cancelled.

Conclusion

Azure Data Factory pipelines can be scheduled to run at specific intervals or triggered based on events. Pipeline runs provide monitoring and logging capabilities, allowing you to track the progress, performance, and status of your data integration workflows.

Potential Developer
Tech Stack
0 +
Accelerate Your Software Development Potential with Us
With our innovative solutions and dedicated expertise, success is a guaranteed outcome. Let's accelerate together towards your goals and beyond.
Blogs You May Love

Don’t let understaffing hold you back. Maximize your team’s performance and reach your business goals with the best IT Staff Augmentation