Posts

Showing posts with the label dataeng

Implement a task orchestrator (2)

This follows a previous post Implement a task orchestrator (1) . As second part of the task orchestrator, we will implement: Scheduler Scheduler is a standalone application that continuously checks existing pipelines and schedules jobs. Before we create a scheduler, we first want to get all the existing pipelines and parse them into a format that can be handled by the scheduler. First, we create a parse module that import all modules in a file and save pipelines. import importlib , sys , time from model.Pipeline import Pipeline def parse (filepath): mod_name = f "{filepath}_{time.time()}" loader = importlib . machinery . SourceFileLoader(mod_name, filepath) spec = importlib . util . spec_from_loader(mod_name, loader) module = importlib . util . module_from_spec(spec) sys . modules[spec . name] = module loader . exec_module(module) pipelines = [] for attr in module . __dict__ . values(): if isinstance (a

Implement a task orchestrator (1)

What is task orchestrator? Orchestration is the coordintation and management of different systems, services, and processes, etc. Task orchestrator is orchestration of multiple tasks, which can have sequence, execution schedules, and complex dependencies. One of the most commonly used orchestrators is Airflow.  What to build? The goal of this exercise is to implement a task orchestrator in Python, so that we can can create a pipeline of multiple tasks with a schedule as below, just like in Airflow. In this setup, a scheduler will read all the pipelines in a pre-defined folder, and executes the pipelines on the right schedules. pipeline = Pipeline( name = 'test' , schedule = '0 0 * * * *' ) def print_test1 (): print ( 'test1' ) def print_test2 (): print ( 'test2' ) task_1 = PythonTask( 'print_test1' , python_callable = print_test1 pipeline = pipeline ) task_2 = PythonTask( 'print_test2

Terraform Error: updating Data Factory - Managed Virtual Network

While upgrading terraform azurerm provider from v2.6.4 to v2.81.0, terraform apply   complained about the managed virtual network on Azure Data Factory. Error: updating Data Factory: (Factory Name "adf-xxxx" / Resource Group "rg-xxxx"): once Managed Virtual Network has been Enabled it's not possible to disable it │ │ with module.data_factory.azurerm_data_factory.data_factory_xxx, │ on modules/data_factory/main.tf line 9, in resource "azurerm_data_factory" "data_factory_xxx": │ 9: resource "azurerm_data_factory" "data_factory_xxx" { This happened because azurerm v2.81.0 is restaging the resource (data factory) with a new schema, and while doing so, the managed virtual network enabled outside of the terraform became an issue. According to the error log, the terraform seems to disable the existing managed virtual network, while I want to leave it enabled.  Fortunately, there was a managed_virtual_network_enabled para

Databricks Authentication Error with Terraform provider upgrade (azurerm > v2.8.1)

Problem While trying to upgrade terraform provider azurerm from v2.6.4 to v3.10.0, I faced the following error when I ran terraform plan.  cannot configure azure-client-secret auth: cannot get workspace: please set `azure_workspace_resource_id` provider argument. Attributes used: auth_type. Environment variables used: ARM_CLIENT_SECRET, ARM_CLIENT_ID, ARM_TENANT_ID. Please check https://registry.terraform.io/providers/databrickslabs/databricks/latest/docs#authentication for details Databricks authentication, which used to work fine, suddenly stopped working with the version change, saying that azure_workspace_resource_id is not set while it is there. provider "databricks" {     azure_workspace_resource_id = < databricks_workspace_id> } The terraform version I used was v1.2.3, and the databricks provider was v1.0.0. The authentication failed not only for azure-client-secret but also with azure-cli. Try 1 (x) provider "databricks" {     azure

How to Run Airflow on Kubernetes

This post will go through steps to run airflow webserver and scheduler on kubernetes using minikube. If you don't have minikube installed yet, please have a look at this post first.  As introduced in this post , Airflow consists of different components, and therefore we need to spin up multiple pods. For this practice, we will create service for Webserver Scheduler Postgres DB (meta database) For workers, we will use LocalExecutor for now. Deploy Airflow on Kubernetes 1. Deploy Postgres database We first need to have postgres up and running. We can create a pod and a service be defining in postgres.yaml. kind: Deployment apiVersion: apps/v1 metadata: name: postgres-airflow spec: replicas: 1 selector: matchLabels: deploy: postgres-airflow template: metadata: labels: name: postgres-airflow deploy: postgres-airflow spec: restartPolicy: Always containers: - name: postgres image: postgres:13.4 port

Airflow Dependencies & Trigger Rules

Setting Dependencies between tasks in Airflow There are different ways to set dependencies in Airflow.  The simplest way is to use bitshift operators like << and >> task1 >> task2 task3 >> [ task4 , task5 ] >> task6 or to use functions  set_upstream   and  set_downstream . task1 . set_downstream ( task2 ) Running   dag.test_cycle() can be helpful (e.g. adding in a unit test) to make sure that it doesn't contain a cycle, as it is not allowed.  In case you want to set multiple parallel dependencies, you can use a function chain , as bitshift operator between two lists like [t1, t2] >> [t3, t4] is not supported.  chain ([ task1 , task3 ], [ task2 , task4 ], task5 ) When you set dependencies between tasks, airflow by default runs the tasks only when all parents have succeeded. However, you can add more complex rules by using trigger rules. Trigger Rules Trigger rules can be specified as an argument on operators. all_suc

What is Apache Airflow?

Image
  Airflow is an open source orchestration tool to programmatically author, schedule and monitor workflows of data pipelines. It allows you to automate ETL tasks for data engineering and build workflows with complex dependency. Source: https://airflow.apache.org/docs/apache-airflow/2.0.1/concepts.html Airflow mainly consists of the following components: Web Server:  Web server is the UI that can be used to get an overview of the overall health of different Directed Acyclic Graphs (DAG) and also to visualize different components and states of each DAG. You can also manage users, roles, and different configurations for the Airflow setup on the web server. Scheduler:  Scheduler orchestrates various DAGs and their tasks, taking care of their interdependencies, limiting the number of runs of each DAG so that one DAG doesn’t overwhelm the entire system, and making it easy for users to schedule and run DAGs on Airflow. Executor:  The executors are the components that actually execute tasks. Th