Implement a task orchestrator (1)

What is task orchestrator?

Orchestration is the coordintation and management of different systems, services, and processes, etc. Task orchestrator is orchestration of multiple tasks, which can have sequence, execution schedules, and complex dependencies. One of the most commonly used orchestrators is Airflow. 


What to build?

The goal of this exercise is to implement a task orchestrator in Python, so that we can can create a pipeline of multiple tasks with a schedule as below, just like in Airflow. In this setup, a scheduler will read all the pipelines in a pre-defined folder, and executes the pipelines on the right schedules.

pipeline = Pipeline(
    name='test',
    schedule='0 0 * * * *'
)

def print_test1():
    print('test1')
    
    
def print_test2():
    print('test2')
    

task_1 = PythonTask(
    'print_test1',
    python_callable=print_test1
    pipeline=pipeline
)

task_2 = PythonTask(
    'print_test2',
    python_callable=print_test2
    pipeline=pipeline
)

task_1.set_downstream(task_2)

As first part of the task orchestrator, we can implement:

  • Task
    In each single task, we can define the type of task, name, details, and which pipeline it belongs to.
  • Pipeline
    A pipeline is a executable unit with the name, schedule, tasks, and dependencies of the tasks defined.
  • Executor (worker)
    A process that actually executes the jobs.

Pipeline

First we want to define a class Pipeline.
To easily manage dependencies, we will use graph from networkx library.

import networkx as nx
import pickle
import marshal

class Pipeline:
    
    def __init__(self, name, schedule):
        self.name = name
        self.schedule = schedule
        self.G = nx.DiGraph()
                

Before we run any tasks, we first want to validate the workflow, as we do not want the stream to have cycles. We can also add display_stream modules to show the whole stream.


import
networkx as nx
import pickle
import marshal

class Pipeline:
    
    def __init__(self, name, schedule):
        self.name = name
        self.schedule = schedule
        self.G = nx.DiGraph()
                
    def validate_stream(self):
        cycles = list(nx.simple_cycles(self.G))
        if len(cycles) != 0:
            # renamed = [t.name for t in pair for pair in cycles]
            raise Exception(f'The pipeline cannot contain a cycle. \nCheck the task dependencies.')
        return True
    
    def display_stream(self):
        node_labels = {node:node.name for node in self.G.nodes()}
        nx.draw_networkx(self.G, arrows=True, labels=node_labels)
        

Now we actually want to run the task.
Because we do not have a database yet, let's assume there is a task queue provided by the scheduler, which will be passed to workers.

We can go through the tasks, check if all the parent tasks have been completed, and put the task in the queue. 


import
networkx as nx
import pickle
import marshal

class Pipeline:
    
    def __init__(self, name, schedule):
        self.name = name
        self.schedule = schedule
        self.G = nx.DiGraph()
                
    def validate_stream(self):
        cycles = list(nx.simple_cycles(self.G))
        if len(cycles) != 0:
            # renamed = [t.name for t in pair for pair in cycles]
            raise Exception(f'The pipeline cannot contain a cycle. \nCheck the task dependencies.')
        return True
    
    def display_stream(self):
        node_labels = {node:node.name for node in self.G.nodes()}
        nx.draw_networkx(self.G, arrows=True, labels=node_labels)
        
    def run(self, task_queue):
        self.validate_stream()
        
        print(f"run pipeline {self.name}")
        
        sorted_tasks = list(nx.topological_sort(self.G))
        
        while sorted_tasks:
            for task in sorted_tasks[:]:
                if self.G.in_edges(task):
                    weights = [self.G[e[0]][e[1]]['weight'] for e in self.G.in_edges(task)]
                    if sum(weights) != 0:
                        continue
                task_queue.put(task)
                sorted_tasks.remove(task)
        
            time.sleep(5)

Our simple Pipeline class is now complete!

Task

Now let's implement a Task class.
First we need a base Task class, and then we can create different Tasks such as PythonTask, HTTPTask, etc (like Operator on airflow) on top of the Task class.

First we initialize the Task instance with name and pipeline given.
Because we implemented Pipeline as a graph, we also add the task instance itself as a node of the graph.

class Task:
    def __init__(self, name, pipeline):
        self.name = name
        self.pipeline = pipeline
        self.pipeline.G.add_node(self)

Then we can add some modules to set dependencies. 
By setting downstream or upstream, we can set edge of two different nodes (tasks) and update the weight of the task to indicate whether the parent node has been already completed or not.
We leave run module empty for now for inheritance.


class
Task: def __init__(self, name, pipeline): self.name = name self.pipeline = pipeline self.pipeline.G.add_node(self) def set_downstream(self, task): self.pipeline.G.add_edge(self, task) self.pipeline.G[self][task]['weight'] = 1 def set_upstream(self, task): self.pipeline.G.add_edge(task, self) self.pipeline.G[task][self]['weight'] = 1 def complete_task(self): for e in self.pipeline.G.out_edges(self): self.pipeline.G[e[0]][e[1]]['weight'] = 0 def run(): pass


Now using this Task class, we can implement PythonTask class.

class PythonTask(Task):
    def __init__(self, name, python_callable, pipeline):
        Task.__init__(self, name, pipeline)
        self.callable = marshal.dumps(python_callable.__code__)
        
    def run(self):
        print(f"run task: {self.name} - {time.time()}")
        code = marshal.loads(self.callable)
        func = types.FunctionType(code, globals(), "test")
        func()
        self.complete_task()

Executor

For executors, we create multiple process and allocates tasks in queue to each process that is not busy.

import multiprocessing as mp
import time
import pickle

class Executor:
    def __init__(self, n=3):
        self.n_process = n
        self.workers = []
        self.task_queue = mp.Queue()
        
    def run(self):
        for i in range(self.n_process):
            p = mp.Process(target=process_tasks, args=(self.task_queue,i))
            self.workers.append(p)
            p.start()
            
    def terminate(self):
        for p in self.workers:
            p.join()

def process_tasks(task_queue, id):
    while True:
        if task_queue.empty():
            time.sleep(1)
            continue
        task = task_queue.get()
        print('received',id, task.name)
        task.run()





Comments