Azure Data Factory (ADF) is an extensively managed, serverless data integration solution designed to handle the ingestion, preparation, and transformation of large volumes of data. Within an Azure subscription, it is possible to possess one or multiple instances, referred to as data factories, of Azure Data Factory.
Azure Data Factory encompasses several fundamental elements, one of which is the Pipeline. In Azure Data Factory and Azure Synapse, an occurrence of executing a pipeline is referred to as a pipeline run.
Pipeline runs are typically initiated by providing arguments to the parameters defined within the pipeline. You can trigger the execution of a pipeline either manually or by utilizing a trigger mechanism.
This article furnishes comprehensive information regarding both methods of executing a pipeline. In this instance, we have a list of pipelines obtained from the Azure API, and each name has the capability to be executed. When the “run” button is clicked, changes through several stages.
Initially, it displays as “InProgress,” then transitions to “Queued,” and finally shows as “Succeeded.” However, there is also a possibility for it to exhibit the “Failed” status. In this example, we use a Web MVC application.
Now, let’s discuss the underlying logic used to initiate the pipeline and update its status. Prior to triggering the pipeline, we retrieve all the pipeline names from the Azure API within the controller.
In this case, all the data will be populated within the Pipeline entity to be display in the grid.
Prerequisites
1. .NET 8.0 (any compatible .net core version).
2. SQL Server (Or any other compatible database).
3. Visual Studio or Visual Studio Code.
Create a Web API project
Create a web API project name as AzureDataFactoryPipeline with many steps. All the steps below here:
Step 1:
Step 2:
Step 3:
Add Necessary NuGet Package
Some necessary Nuget package need to install. After installed all the Nuget package then my application After installing the necessary NuGet packages, all the packages were displayed in my application’s .csproj file.
net8.0
enable
enable
Configure AppSetting.json
Make sure to configure all your pipeline configure credentials in the appsettings.json file. In the appsettings.json file, include the following information:
"PipelineConfigure": {
"TenantId": "...",
"ClientId": "...",
"ClientSecret": "..",
"FactoryName": "...",
"ResourceGroupName": "..",
"SubscriptionId": "..",
"GrantType": "..",
"Resource": "..",
},
To retrieve JSON values from appsettings and appsettings. it is necessary to create a class property. Use the following syntax to define the property:
public class PipelineConfigure{
public string TenantId { get; set; }
public string ClientId { get; set; }
public string ClientSecret { get; set; }
public string FactoryName { get; set; }
public string ResourceGroupName { get; set; }
public string SubscriptionId { get; set; }
public string GrantType { get; set; }
public string Resource { get; set; }
}
Next, let’s register the application services. Below is the corresponding configuration:
Builder.Services.AddSingleton(_configuration.GetSection(nameof(PipelineConfigure)).Get());
Create A Service Utility Class
To execute an Azure Pipeline using the Azure Data Factory REST API, we need to generate a token, then we need to fetch pipelines, then we need get Runnable Id given by pipeline then we make a request to start pipeline.
Those can be achieved by using an HTTP request with RestSharp and Azure resource manager data factory package.
So for this service class we need to create some DTOs. Here all of my DTOs below.
public class PipelineParam
{
public string SubscriptionId { get; set; }
public string ResourceGroupName { get; set; }
public string FactoryName { get; set; }
public string Version { get; set; }
public string Token { get; set; }
}
public class PipelineResponse
{
public List Value { get; set; }
}
public class Value
{
public string Id { get; set; }
public string Name { get; set; }
public string Type { get; set; }
public dynamic Properties { get; set; }
public string Etag { get; set; }
}
public class CreateRunResponse
{
public string RunId { get; set; }
}
public class TokenResponse
{
public string token_type { get; set; }
public int expires_in { get; set; }
public int ext_expires_in { get; set; }
public string access_token { get; set; }
}
For managing token for request azure resource manage. I implement a custom token handler service. Which is below here:
public class CustomTokenCredential : TokenCredential
{
private readonly string _token;
public CustomTokenCredential(string token)
{
_token = token;
}
public override AccessToken GetToken(TokenRequestContext requestContext, CancellationToken cancellationToken)
{
return new AccessToken(_token, DateTimeOffset.MaxValue);
}
public override async ValueTask GetTokenAsync(TokenRequestContext requestContext, CancellationToken cancellationToken)
{
return await Task.FromResult(new AccessToken(_token, DateTimeOffset.MaxValue));
}
}
IADFService class:
using Azure.ResourceManager.DataFactory;
using Azure.ResourceManager;
using AzureDataFactoryPipeline.Services.Models;
using Newtonsoft.Json;
using RestSharp;
using System.Net;
public interface IADFService
{
Task> GetPipelinesAsync(PipelineParam pipelineParam);
Task GetRunIdAsync(string pipelineName,string token);
Task GetTokenAsync(string teanantId, string grant_type, string client_id, string client_secret, string resource);
Task GetStatusProcessAsync(string token, string runId);
}
public class ADFServices : IADFService
{
private readonly ADFConfigure _config;
public ADFServices(ADFConfigure config)
{
_config = config;
}
public async Task> GetPipelinesAsync(PipelineParam pipelineParam)
{
string base_url = "https://management.azure.com/subscriptions/";
pipelineParam.Version = "2018-06-01";
var client = new RestClient($"{base_url}{pipelineParam.SubscriptionId}/resourceGroups/{pipelineParam.ResourceGroupName}/providers/Microsoft.DataFactory/factories/{pipelineParam.FactoryName}/pipelines?api-version={pipelineParam.Version}");
var request = new RestRequest();
request.AddHeader("Content-Type", "application/json");
request.AddHeader("Authorization", "Bearer " + pipelineParam.Token);
var restResponse = await client.ExecuteGetAsync(request);
if (restResponse.StatusCode == HttpStatusCode.OK)
{
var response = JsonConvert.DeserializeObject(restResponse.Content!);
if (response is null) return new List();
return response.Value.Select(x => x.Name).ToList();
}
return new List();
}
public async Task GetRunIdAsync(string pipelineName,string token)
{
var version = "2018-06-01";
var url = $"https://management.azure.com/subscriptions/{_config.SubscriptionId}/resourceGroups/{_config.ResourceGroupName}/providers/Microsoft.DataFactory/factories/{_config.FactoryName}/pipelines/{pipelineName}/createRun?api-version={version}";
var client = new RestClient(url);
var request = new RestRequest();
request.AddHeader("Content-Type", "application/json");
request.AddHeader("Authorization", "Bearer " + token);
var restResponse = await client.ExecutePostAsync(request);
if (restResponse.StatusCode == HttpStatusCode.OK)
{
var response = JsonConvert.DeserializeObject(restResponse.Content!);
if (response is null) return string.Empty;
return response.RunId;
}
return string.Empty;
}
public async Task GetTokenAsync(string teanantId, string grant_type, string client_id, string client_secret, string resource)
{
var url = $"https://login.microsoftonline.com/{teanantId}/oauth2/token";
try
{
var client = new RestClient(url);
var request = new RestRequest();
request.AddHeader("Content-Type", "application/x-www-form-urlencoded");
request.AddParameter(nameof(grant_type), grant_type);
request.AddParameter(nameof(client_id), client_id);
request.AddParameter(nameof(client_secret), client_secret);
request.AddParameter(nameof(resource), resource);
var restResponse = await client.ExecutePostAsync(request);
if (restResponse.StatusCode == HttpStatusCode.OK)
{
var response = JsonConvert.DeserializeObject(restResponse.Content!);
if (response is null) return string.Empty;
return response.access_token;
}
}
catch (Exception)
{
throw;
}
return string.Empty;
}
public async Task GetStatusProcessAsync(string token, string runId)
{
var credential = new CustomTokenCredential(token);
var armClient = new ArmClient(credential, _config.SubscriptionId);
var resourceId = DataFactoryResource.CreateResourceIdentifier(
_config.SubscriptionId,
_config.ResourceGroupName,
_config.FactoryName);
var dataFactory = armClient.GetDataFactoryResource(resourceId);
var pipelineRun = await dataFactory.GetPipelineRunAsync(runId);
return pipelineRun.Value.Status;
}
}
Create A Controller
Once a service class is created. Then we need to create a controller name is PipelineController. Let’s create a controller.
[ApiController]
[Route("api/[controller]")]
public class PipelineController : ControllerBase
{
private readonly ADFConfigure _config;
private readonly IADFService _service;
public PipelineController(ADFConfigure config,IADFService service)
{
_config = config;
_service = service;
}
[HttpGet]
public async Task Get()
{
var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);
var param = new PipelineParam
{
FactoryName = _config.FactoryName,
ResourceGroupName = _config.ResourceGroupName,
SubscriptionId = _config.SubscriptionId,
Token = token
};
var pipelies = await _service.GetPipelinesAsync(param);
return Ok(pipelies);
}
[HttpGet("runId",Name ="runId")]
public async Task RunId([FromQuery] string pipelineName)
{
var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);
var runId = await _service.GetRunIdAsync(pipelineName, token);
return Ok(runId);
}
[HttpPost("runStatus",Name ="runStatus")]
public async Task Post([FromBody] string runId)
{
var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);
var runStatus = await _service.GetStatusProcessAsync(token,runId);
return Ok(runStatus);
}
}
Let’s demonstrate all the endpoints
Retrieve Pipelines:
Then calls to the Azure Rest API in order to obtain a token and using the pipeline credentials.
var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);
var param = new PipelineParam
{
FactoryName = _config.FactoryName,
ResourceGroupName = _config.ResourceGroupName,
SubscriptionId = _config.SubscriptionId,
Token = token
};
var pipelies = await _service.GetPipelinesAsync(param);
After running the application, the “/api/Pipeline” endpoint looks like this:
Response looks like this:
[
“Get Contacts”
]
Retrieve Run ID:
To trigger each pipeline, we need to obtain the Run ID first. To achieve this, we make a request using the following code. Let’s create an endpoint for receive response.
[HttpGet("runId",Name ="runId")]
public async Task RunId([FromQuery] string pipelineName)
{
var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);
var runId = await _service.GetRunIdAsync(pipelineName, token);
return Ok(runId);
}
For this endpoint we have to put a pipeline name “Get Contacts”. which we got from previous endpoint.
Now, we got a Run ID “c8ab422d-09d2-4b08-8948-61690dd96c7a” from Response Body.
Run pipeline and retrieve status :
Subsequently, we trigger the pipeline and retrieve the updated status of the pipeline run. This can be done by making a request using the following code:
[HttpPost("runStatus",Name ="runStatus")]
public async Task Post([FromBody] string runId)
{
var token = await _service.GetTokenAsync(_config.TenantId, _config.GrantType, _config.ClientId, _config.ClientSecret, _config.Resource);
var runStatus = await _service.GetStatusProcessAsync(token,runId);
return Ok(runStatus);
}
For this endpoint we have to put Run ID “c8ab422d-09d2-4b08-8948-61690dd96c7a”. Which we got from previous endpoint.
Retrieves the status of a pipeline run, which can be one of the following: Queued, InProgress, Succeeded, Failed, Canceling, or Cancelled.
Conclusion
Azure Data Factory pipelines can be scheduled to run at specific intervals or triggered based on events. Pipeline runs provide monitoring and logging capabilities, allowing you to track the progress, performance, and status of your data integration workflows.