How to Run Airflow on Kubernetes

This post will go through steps to run airflow webserver and scheduler on kubernetes using minikube.

If you don't have minikube installed yet, please have a look at this post first. 

As introduced in this post, Airflow consists of different components, and therefore we need to spin up multiple pods. For this practice, we will create service for

  • Webserver
  • Scheduler
  • Postgres DB (meta database)
For workers, we will use LocalExecutor for now.

Deploy Airflow on Kubernetes

1. Deploy Postgres database

We first need to have postgres up and running.
We can create a pod and a service be defining in postgres.yaml.

kind: Deployment
apiVersion: apps/v1
metadata:
  name: postgres-airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      deploy: postgres-airflow
  template:
    metadata:
      labels:
        name: postgres-airflow
        deploy: postgres-airflow
    spec:
      restartPolicy: Always
      containers:
        - name: postgres
          image: postgres:13.4
          ports:
            - containerPort: 5432
              protocol: TCP
          volumeMounts:
            - name: dbvol
              mountPath: /var/lib/postgresql/data/pgdata
              subPath: pgdata
          env:
            - name: POSTGRES_USER
              value: airflow
            - name: POSTGRES_PASSWORD
              value: airflow
            - name: POSTGRES_DB
              value: airflow
            - name: PGDATA
              value: /var/lib/postgresql/data/pgdata
            - name: POD_IP
              valueFrom: { fieldRef: { fieldPath: status.podIP } }
          livenessProbe:
            initialDelaySeconds: 60
            timeoutSeconds: 5
            failureThreshold: 5
            exec:
              command:
              - /bin/sh
              - -c
              - exec pg_isready --host $POD_IP ||  if [[ $(psql -qtAc --host $POD_IP 'SELECT pg_is_in_recovery') != "f" ]]; then  exit 0 else; exit 1; fi
          readinessProbe:
            initialDelaySeconds: 5
            timeoutSeconds: 5
            periodSeconds: 5
            exec:
              command:
              - /bin/sh
              - -c
              - exec pg_isready --host $POD_IP
          resources:
            requests:
              memory: .5Gi
              cpu: .5
      volumes:
        - name: dbvol
          emptyDir: {}
---
kind: Service
apiVersion: v1
metadata:
  name: postgres-airflow
spec:
  selector:
    name: postgres-airflow
  ports:
  - name: postgres-airflow
    protocol: TCP
    port: 5432
    targetPort: 5432

and deploy by running the command
kubectl apply -f postgres.yaml

and check by
kubectl get pods
kubectl get services


2. Init database in airflow-init-db.yaml

kind: Job
apiVersion: batch/v1
metadata:
  name: airflow-init-db
spec:
  template:
    spec:
      containers:
      - name: airflow-init-db
        args:
        - db
        - init
        env:
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: postgresql://airflow:airflow@postgres-airflow:5432/airflow
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        image: apache/airflow:2.1.4
        imagePullPolicy: Always
      restartPolicy: "Never"


kubectl apply -f airflow-init-db.yaml

Check if the job has completed by running
kubectl get jobs



3. Logs, config

Provision a PersistentVolumeClaim for logs.

logs-pv.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi

airflow-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
data:
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres-airflow:5432/airflow
  AIRFLOW__CORE__EXECUTOR: LocalExecutor
  AIRFLOW__CORE__FERNET_KEY: ''
  AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
  AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
  AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'

kubectl apply -f logs-pv.yaml
kubectl apply -f airflow-configmap.yaml


4. Web Server

kind: Deployment
apiVersion: apps/v1
metadata:
  name: airflow-webserver
spec:
  replicas: 1
  selector:
    matchLabels:
      deploy: airflow-webserver
  template:
    metadata:
      labels:
        deploy: airflow-webserver
    spec:
      containers:
        - name: airflow-webserver
          image: apache/airflow:2.1.4
          envFrom:
            - configMapRef:
                name: airflow-config
          ports:
            - containerPort: 8080
          command:
            - airflow
            - webserver
          volumeMounts:
            - mountPath: /opt/airflow/logs
              name: logs
      restartPolicy: Always
      volumes:
        - name: logs
          persistentVolumeClaim:
            claimName: airflow-logs
---
apiVersion: v1
kind: Service
metadata:
  name: airflow-webserver-svc
spec:
  selector:
    app: airflow-webserver
  ports:
    - name: web
      protocol: TCP
      targetPort: 8080
      port: 8080

kubectl apply -f airflow-webserver.yaml


5. Scheduler

kind: Deployment
apiVersion: apps/v1
metadata:
  name: airflow-scheduler
spec:
  replicas: 1
  selector:
    matchLabels:
      deploy: airflow-scheduler
  template:
    metadata:
      labels:
        deploy: airflow-scheduler
    spec:
      containers:
        - name: airflow-scheduler
          image: apache/airflow:2.1.4
          envFrom:
            - configMapRef:
                name: airflow-config
          command:
            - airflow
            - scheduler
          volumeMounts:
            - name: dags
              mountPath: /opt/airflow/dags
        - name: dag-sync
          image: amazon/aws-cli
          env:
            - name: LOCATION
              value: "s3://<s3 location>"
            - name: SLEEP_INTERVAL
              value: "60"
          command:
            - sh
            - "-c"
            - "while true; do aws s3 sync s3://<s3 location> . --delete --no-sign-request --acl public-read done"
          volumeMounts:
            - name: dags
              mountPath: /opt/airflow/dags
      restartPolicy: Always
      volumes:
        - name: logs
          persistentVolumeClaim:
            claimName: airflow-logs
        - name: dags
          emptyDir: {}
kubectl apply -f airflow-scheduler.yaml

Comments