Posts

Implement a task orchestrator (2)

This follows a previous post Implement a task orchestrator (1) . As second part of the task orchestrator, we will implement: Scheduler Scheduler is a standalone application that continuously checks existing pipelines and schedules jobs. Before we create a scheduler, we first want to get all the existing pipelines and parse them into a format that can be handled by the scheduler. First, we create a parse module that import all modules in a file and save pipelines. import importlib , sys , time from model.Pipeline import Pipeline def parse (filepath): mod_name = f "{filepath}_{time.time()}" loader = importlib . machinery . SourceFileLoader(mod_name, filepath) spec = importlib . util . spec_from_loader(mod_name, loader) module = importlib . util . module_from_spec(spec) sys . modules[spec . name] = module loader . exec_module(module) pipelines = [] for attr in module . __dict__ . values(): if isinstance (a

Implement a task orchestrator (1)

What is task orchestrator? Orchestration is the coordintation and management of different systems, services, and processes, etc. Task orchestrator is orchestration of multiple tasks, which can have sequence, execution schedules, and complex dependencies. One of the most commonly used orchestrators is Airflow.  What to build? The goal of this exercise is to implement a task orchestrator in Python, so that we can can create a pipeline of multiple tasks with a schedule as below, just like in Airflow. In this setup, a scheduler will read all the pipelines in a pre-defined folder, and executes the pipelines on the right schedules. pipeline = Pipeline( name = 'test' , schedule = '0 0 * * * *' ) def print_test1 (): print ( 'test1' ) def print_test2 (): print ( 'test2' ) task_1 = PythonTask( 'print_test1' , python_callable = print_test1 pipeline = pipeline ) task_2 = PythonTask( 'print_test2

Terraform Error: updating Data Factory - Managed Virtual Network

While upgrading terraform azurerm provider from v2.6.4 to v2.81.0, terraform apply   complained about the managed virtual network on Azure Data Factory. Error: updating Data Factory: (Factory Name "adf-xxxx" / Resource Group "rg-xxxx"): once Managed Virtual Network has been Enabled it's not possible to disable it │ │ with module.data_factory.azurerm_data_factory.data_factory_xxx, │ on modules/data_factory/main.tf line 9, in resource "azurerm_data_factory" "data_factory_xxx": │ 9: resource "azurerm_data_factory" "data_factory_xxx" { This happened because azurerm v2.81.0 is restaging the resource (data factory) with a new schema, and while doing so, the managed virtual network enabled outside of the terraform became an issue. According to the error log, the terraform seems to disable the existing managed virtual network, while I want to leave it enabled.  Fortunately, there was a managed_virtual_network_enabled para

Databricks Authentication Error with Terraform provider upgrade (azurerm > v2.8.1)

Problem While trying to upgrade terraform provider azurerm from v2.6.4 to v3.10.0, I faced the following error when I ran terraform plan.  cannot configure azure-client-secret auth: cannot get workspace: please set `azure_workspace_resource_id` provider argument. Attributes used: auth_type. Environment variables used: ARM_CLIENT_SECRET, ARM_CLIENT_ID, ARM_TENANT_ID. Please check https://registry.terraform.io/providers/databrickslabs/databricks/latest/docs#authentication for details Databricks authentication, which used to work fine, suddenly stopped working with the version change, saying that azure_workspace_resource_id is not set while it is there. provider "databricks" {     azure_workspace_resource_id = < databricks_workspace_id> } The terraform version I used was v1.2.3, and the databricks provider was v1.0.0. The authentication failed not only for azure-client-secret but also with azure-cli. Try 1 (x) provider "databricks" {     azure

Monitoring with Prometheus

Prometheus is a free software application used for even monitoring and alerting. It records real time metrics in a time series database built using a HTTP pull model, with flexible queries and real-time alerting. Why monitoring? When we build an application, problems will occur regardless of how well it has been designed or built. In order to be prepared for dealing with any problems, we should be able to collect logs and diagnose when something goes wrong. Monitoring is a valuable source of information on which you can decide how to act upon a problem in the system. To monitor your application efficiently, in general, you need something to measure, visualize the states and alert when something goes wrong. What is Prometheus? Prometheus is an open source monitoring framework written in Go.  PromQL (Prometheus Query Language) can be used to query the prometheus data, and it has no reliance on the distributed storage as single server nodes are autonomous. Prometheus uses HTTP interface,

How to Run Airflow on Kubernetes

This post will go through steps to run airflow webserver and scheduler on kubernetes using minikube. If you don't have minikube installed yet, please have a look at this post first.  As introduced in this post , Airflow consists of different components, and therefore we need to spin up multiple pods. For this practice, we will create service for Webserver Scheduler Postgres DB (meta database) For workers, we will use LocalExecutor for now. Deploy Airflow on Kubernetes 1. Deploy Postgres database We first need to have postgres up and running. We can create a pod and a service be defining in postgres.yaml. kind: Deployment apiVersion: apps/v1 metadata: name: postgres-airflow spec: replicas: 1 selector: matchLabels: deploy: postgres-airflow template: metadata: labels: name: postgres-airflow deploy: postgres-airflow spec: restartPolicy: Always containers: - name: postgres image: postgres:13.4 port

Airflow Dependencies & Trigger Rules

Setting Dependencies between tasks in Airflow There are different ways to set dependencies in Airflow.  The simplest way is to use bitshift operators like << and >> task1 >> task2 task3 >> [ task4 , task5 ] >> task6 or to use functions  set_upstream   and  set_downstream . task1 . set_downstream ( task2 ) Running   dag.test_cycle() can be helpful (e.g. adding in a unit test) to make sure that it doesn't contain a cycle, as it is not allowed.  In case you want to set multiple parallel dependencies, you can use a function chain , as bitshift operator between two lists like [t1, t2] >> [t3, t4] is not supported.  chain ([ task1 , task3 ], [ task2 , task4 ], task5 ) When you set dependencies between tasks, airflow by default runs the tasks only when all parents have succeeded. However, you can add more complex rules by using trigger rules. Trigger Rules Trigger rules can be specified as an argument on operators. all_suc