Implement a task orchestrator (1)
What is task orchestrator?
Orchestration is the coordintation and management of different systems, services, and processes, etc. Task orchestrator is orchestration of multiple tasks, which can have sequence, execution schedules, and complex dependencies. One of the most commonly used orchestrators is Airflow.
What to build?
The goal of this exercise is to implement a task orchestrator in Python, so that we can can create a pipeline of multiple tasks with a schedule as below, just like in Airflow. In this setup, a scheduler will read all the pipelines in a pre-defined folder, and executes the pipelines on the right schedules.
pipeline = Pipeline( name='test', schedule='0 0 * * * *' ) def print_test1(): print('test1') def print_test2(): print('test2') task_1 = PythonTask( 'print_test1', python_callable=print_test1 pipeline=pipeline ) task_2 = PythonTask( 'print_test2', python_callable=print_test2 pipeline=pipeline ) task_1.set_downstream(task_2)
As first part of the task orchestrator, we can implement:
- Task
In each single task, we can define the type of task, name, details, and which pipeline it belongs to. - Pipeline
A pipeline is a executable unit with the name, schedule, tasks, and dependencies of the tasks defined. - Executor (worker)
A process that actually executes the jobs.
Pipeline
First we want to define a class Pipeline.
To easily manage dependencies, we will use graph from networkx library.
import networkx as nx
import pickle import marshal class Pipeline: def __init__(self, name, schedule): self.name = name self.schedule = schedule self.G = nx.DiGraph()
Before we run any tasks, we first want to validate the workflow, as we do not want the stream to have cycles. We can also add display_stream modules to show the whole stream.
import networkx as nx
import pickle import marshal class Pipeline: def __init__(self, name, schedule): self.name = name self.schedule = schedule self.G = nx.DiGraph() def validate_stream(self): cycles = list(nx.simple_cycles(self.G)) if len(cycles) != 0: # renamed = [t.name for t in pair for pair in cycles] raise Exception(f'The pipeline cannot contain a cycle. \nCheck the task dependencies.') return True def display_stream(self): node_labels = {node:node.name for node in self.G.nodes()} nx.draw_networkx(self.G, arrows=True, labels=node_labels)
Now we actually want to run the task.
Because we do not have a database yet, let's assume there is a task queue provided by the scheduler, which will be passed to workers.
We can go through the tasks, check if all the parent tasks have been completed, and put the task in the queue.
import networkx as nx
import pickle import marshal class Pipeline: def __init__(self, name, schedule): self.name = name self.schedule = schedule self.G = nx.DiGraph() def validate_stream(self): cycles = list(nx.simple_cycles(self.G)) if len(cycles) != 0: # renamed = [t.name for t in pair for pair in cycles] raise Exception(f'The pipeline cannot contain a cycle. \nCheck the task dependencies.') return True def display_stream(self): node_labels = {node:node.name for node in self.G.nodes()} nx.draw_networkx(self.G, arrows=True, labels=node_labels) def run(self, task_queue): self.validate_stream() print(f"run pipeline {self.name}") sorted_tasks = list(nx.topological_sort(self.G)) while sorted_tasks: for task in sorted_tasks[:]: if self.G.in_edges(task): weights = [self.G[e[0]][e[1]]['weight'] for e in self.G.in_edges(task)] if sum(weights) != 0: continue task_queue.put(task) sorted_tasks.remove(task) time.sleep(5)
Our simple Pipeline class is now complete!
Task
Now let's implement a Task class.
First we need a base Task class, and then we can create different Tasks such as PythonTask, HTTPTask, etc (like Operator on airflow) on top of the Task class.
First we initialize the Task instance with name and pipeline given.
Because we implemented Pipeline as a graph, we also add the task instance itself as a node of the graph.
class Task: def __init__(self, name, pipeline): self.name = name self.pipeline = pipeline self.pipeline.G.add_node(self)
Then we can add some modules to set dependencies.
By setting downstream or upstream, we can set edge of two different nodes (tasks) and update the weight of the task to indicate whether the parent node has been already completed or not.
We leave run module empty for now for inheritance.
class Task: def __init__(self, name, pipeline): self.name = name self.pipeline = pipeline self.pipeline.G.add_node(self) def set_downstream(self, task): self.pipeline.G.add_edge(self, task) self.pipeline.G[self][task]['weight'] = 1 def set_upstream(self, task): self.pipeline.G.add_edge(task, self) self.pipeline.G[task][self]['weight'] = 1 def complete_task(self): for e in self.pipeline.G.out_edges(self): self.pipeline.G[e[0]][e[1]]['weight'] = 0 def run(): pass
Now using this Task class, we can implement PythonTask class.
class PythonTask(Task):
def __init__(self, name, python_callable, pipeline): Task.__init__(self, name, pipeline) self.callable = marshal.dumps(python_callable.__code__) def run(self): print(f"run task: {self.name} - {time.time()}") code = marshal.loads(self.callable) func = types.FunctionType(code, globals(), "test") func() self.complete_task()
Executor
For executors, we create multiple process and allocates tasks in queue to each process that is not busy.
import multiprocessing as mp import time import pickle class Executor: def __init__(self, n=3): self.n_process = n self.workers = [] self.task_queue = mp.Queue() def run(self): for i in range(self.n_process): p = mp.Process(target=process_tasks, args=(self.task_queue,i)) self.workers.append(p) p.start() def terminate(self): for p in self.workers: p.join() def process_tasks(task_queue, id): while True: if task_queue.empty(): time.sleep(1) continue task = task_queue.get() print('received',id, task.name) task.run()
Comments
Post a Comment