Implement a task orchestrator (2)

This follows a previous post Implement a task orchestrator (1).

As second part of the task orchestrator, we will implement:

  • Scheduler
    Scheduler is a standalone application that continuously checks existing pipelines and schedules jobs.

Before we create a scheduler, we first want to get all the existing pipelines and parse them into a format that can be handled by the scheduler.

First, we create a parse module that import all modules in a file and save pipelines.

import importlib, sys, time
from model.Pipeline import Pipeline
        
def parse(filepath):
    mod_name = f"{filepath}_{time.time()}"
    loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
    spec = importlib.util.spec_from_loader(mod_name, loader)
    module = importlib.util.module_from_spec(spec)
    sys.modules[spec.name] = module
    loader.exec_module(module)
    
    pipelines = []
    for attr in module.__dict__.values():
        if isinstance(attr, Pipeline):
            pipelines.append(attr)
    
    return pipelines

Then we add a module to read parse all files in a folder and save all the pipelines.


import
glob, os import croniter as cron import importlib, sys, time from workflow.model.Pipeline import Pipeline def get_all_pipelines(folder): pipelines = [] files = glob.glob(os.path.join(folder, '*.py')) for filepath in files: pipelines.extend(parse(filepath)) return pipelines def parse(filepath): mod_name = f"{filepath}_{time.time()}" loader = importlib.machinery.SourceFileLoader(mod_name, filepath) spec = importlib.util.spec_from_loader(mod_name, loader) module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module loader.exec_module(module) pipelines = [] for attr in module.__dict__.values(): if isinstance(attr, Pipeline): pipelines.append(attr) return pipelines


Scheduler

Now we can create a scheduler that gets folder and task queue as an input.
We also add a module to get all the pipelines and schedules, and sort them using croniter library.

import croniter as cron

class Scheduler:
    def __init__(self, folder, queue):
        self.folder = folder
        self.schedules = []
        self.task_queue = queue
    
    def get_schedules(self):
        pipelines = get_all_pipelines(self.folder)
        self.schedules = []
        for p in pipelines:
            print(p.name, p.schedule)
            schedule_iter = cron.croniter(p.schedule)
            self.schedules.append((p, schedule_iter.get_next(), schedule_iter))
        

Now we can add a run module that keeps checking the pipelines and schedules every minute, and run the pipeline, which puts tasks in the queue.

import croniter as cron

class Scheduler:
    def __init__(self, folder, queue):
        self.folder = folder
        self.schedules = []
        self.task_queue = queue
    
    def get_schedules(self):
        pipelines = get_all_pipelines(self.folder)
        self.schedules = []
        for p in pipelines:
            print(p.name, p.schedule)
            schedule_iter = cron.croniter(p.schedule)
            self.schedules.append((p, schedule_iter.get_next(), schedule_iter))
        
    def run(self):
        i = 0
        while True:
            if i % 60 == 0:
                self.get_schedules()
                
            self.schedules.sort(key=lambda x: x[1])
            for nextrun in self.schedules[:]:
                if nextrun[1] <= time.time():
                    print(time.time(), nextrun[1])
                    pipeline = nextrun[0]
                    schedule_iter = nextrun[2]
                    pipeline.run(self.task_queue)
                    self.schedules.pop(0)
                    self.schedules.append((pipeline, schedule_iter.get_next(), schedule_iter))
                else:
                    break
                
            time.sleep(1)
            i += 1


Comments