How to Run Airflow on Kubernetes
This post will go through steps to run airflow webserver and scheduler on kubernetes using minikube.
If you don't have minikube installed yet, please have a look at this post first.
As introduced in this post, Airflow consists of different components, and therefore we need to spin up multiple pods. For this practice, we will create service for
- Webserver
- Scheduler
- Postgres DB (meta database)
For workers, we will use LocalExecutor for now.
Deploy Airflow on Kubernetes
1. Deploy Postgres database
We first need to have postgres up and running.
We can create a pod and a service be defining in postgres.yaml.
kind: Deployment
apiVersion: apps/v1
metadata:
name: postgres-airflow
spec:
replicas: 1
selector:
matchLabels:
deploy: postgres-airflow
template:
metadata:
labels:
name: postgres-airflow
deploy: postgres-airflow
spec:
restartPolicy: Always
containers:
- name: postgres
image: postgres:13.4
ports:
- containerPort: 5432
protocol: TCP
volumeMounts:
- name: dbvol
mountPath: /var/lib/postgresql/data/pgdata
subPath: pgdata
env:
- name: POSTGRES_USER
value: airflow
- name: POSTGRES_PASSWORD
value: airflow
- name: POSTGRES_DB
value: airflow
- name: PGDATA
value: /var/lib/postgresql/data/pgdata
- name: POD_IP
valueFrom: { fieldRef: { fieldPath: status.podIP } }
livenessProbe:
initialDelaySeconds: 60
timeoutSeconds: 5
failureThreshold: 5
exec:
command:
- /bin/sh
- -c
- exec pg_isready --host $POD_IP || if [[ $(psql -qtAc --host $POD_IP 'SELECT pg_is_in_recovery') != "f" ]]; then exit 0 else; exit 1; fi
readinessProbe:
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 5
exec:
command:
- /bin/sh
- -c
- exec pg_isready --host $POD_IP
resources:
requests:
memory: .5Gi
cpu: .5
volumes:
- name: dbvol
emptyDir: {}
---
kind: Service
apiVersion: v1
metadata:
name: postgres-airflow
spec:
selector:
name: postgres-airflow
ports:
- name: postgres-airflow
protocol: TCP
port: 5432
targetPort: 5432
and deploy by running the command
kubectl apply -f postgres.yaml
and check by
kubectl get podskubectl get services2. Init database in airflow-init-db.yaml
kind: Job
apiVersion: batch/v1
metadata:
name: airflow-init-db
spec:
template:
spec:
containers:
- name: airflow-init-db
args:
- db
- init
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://airflow:airflow@postgres-airflow:5432/airflow
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: apache/airflow:2.1.4
imagePullPolicy: Always
restartPolicy: "Never"
kubectl apply -f airflow-init-db.yaml
Check if the job has completed by running
kubectl get jobs
3. Logs, config
Provision a PersistentVolumeClaim for logs.
logs-pv.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
airflow-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: name: airflow-config data: AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres-airflow:5432/airflow AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'true' AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
kubectl apply -f logs-pv.yaml
kubectl apply -f airflow-configmap.yaml
4. Web Server
kind: Deployment
apiVersion: apps/v1
metadata:
name: airflow-webserver
spec:
replicas: 1
selector:
matchLabels:
deploy: airflow-webserver
template:
metadata:
labels:
deploy: airflow-webserver
spec:
containers:
- name: airflow-webserver
image: apache/airflow:2.1.4
envFrom:
- configMapRef:
name: airflow-config
ports:
- containerPort: 8080
command:
- airflow
- webserver
volumeMounts:
- mountPath: /opt/airflow/logs
name: logs
restartPolicy: Always
volumes:
- name: logs
persistentVolumeClaim:
claimName: airflow-logs
---
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver-svc
spec:
selector:
app: airflow-webserver
ports:
- name: web
protocol: TCP
targetPort: 8080
port: 8080
kubectl apply -f airflow-webserver.yaml
5. Scheduler
kind: Deployment
apiVersion: apps/v1
metadata:
name: airflow-scheduler
spec:
replicas: 1
selector:
matchLabels:
deploy: airflow-scheduler
template:
metadata:
labels:
deploy: airflow-scheduler
spec:
containers:
- name: airflow-scheduler
image: apache/airflow:2.1.4
envFrom:
- configMapRef:
name: airflow-config
command:
- airflow
- scheduler
volumeMounts:
- name: dags
mountPath: /opt/airflow/dags
- name: dag-sync
image: amazon/aws-cli
env:
- name: LOCATION
value: "s3://<s3 location>"
- name: SLEEP_INTERVAL
value: "60"
command:
- sh
- "-c"
- "while true; do aws s3 sync s3://<s3 location> . --delete --no-sign-request --acl public-read done"
volumeMounts:
- name: dags
mountPath: /opt/airflow/dags
restartPolicy: Always
volumes:
- name: logs
persistentVolumeClaim:
claimName: airflow-logs
- name: dags
emptyDir: {}
kubectl apply -f airflow-scheduler.yaml
Comments
Post a Comment