Implement a task orchestrator (2)

This follows a previous post Implement a task orchestrator (1).

As second part of the task orchestrator, we will implement:

  • Scheduler
    Scheduler is a standalone application that continuously checks existing pipelines and schedules jobs.

Before we create a scheduler, we first want to get all the existing pipelines and parse them into a format that can be handled by the scheduler.

First, we create a parse module that import all modules in a file and save pipelines.

Then we add a module to read parse all files in a folder and save all the pipelines.

glob, os import croniter as cron import importlib, sys, time from workflow.model.Pipeline import Pipeline def get_all_pipelines(folder): pipelines = [] files = glob.glob(os.path.join(folder, '*.py')) for filepath in files: pipelines.extend(parse(filepath)) return pipelines def parse(filepath): mod_name = f"{filepath}_{time.time()}" loader = importlib.machinery.SourceFileLoader(mod_name, filepath) spec = importlib.util.spec_from_loader(mod_name, loader) module = importlib.util.module_from_spec(spec) sys.modules[] = module loader.exec_module(module) pipelines = [] for attr in module.__dict__.values(): if isinstance(attr, Pipeline): pipelines.append(attr) return pipelines


Now we can create a scheduler that gets folder and task queue as an input.
We also add a module to get all the pipelines and schedules, and sort them using croniter library.

Now we can add a run module that keeps checking the pipelines and schedules every minute, and run the pipeline, which puts tasks in the queue.

import croniter as cron

class Scheduler:
    def __init__(self, folder, queue):
        self.folder = folder
        self.schedules = []
        self.task_queue = queue
    def get_schedules(self):
        pipelines = get_all_pipelines(self.folder)
        self.schedules = []
        for p in pipelines:
            print(, p.schedule)
            schedule_iter = cron.croniter(p.schedule)
            self.schedules.append((p, schedule_iter.get_next(), schedule_iter))
    def run(self):
        i = 0
        while True:
            if i % 60 == 0:
            self.schedules.sort(key=lambda x: x[1])
            for nextrun in self.schedules[:]:
                if nextrun[1] <= time.time():
                    print(time.time(), nextrun[1])
                    pipeline = nextrun[0]
                    schedule_iter = nextrun[2]
                    self.schedules.append((pipeline, schedule_iter.get_next(), schedule_iter))
            i += 1
