Implement a task orchestrator (2)
This follows a previous post Implement a task orchestrator (1).
As second part of the task orchestrator, we will implement:
- Scheduler
Scheduler is a standalone application that continuously checks existing pipelines and schedules jobs.
Before we create a scheduler, we first want to get all the existing pipelines and parse them into a format that can be handled by the scheduler.
First, we create a parse module that import all modules in a file and save pipelines.
import importlib, sys, time from model.Pipeline import Pipeline def parse(filepath): mod_name = f"{filepath}_{time.time()}" loader = importlib.machinery.SourceFileLoader(mod_name, filepath) spec = importlib.util.spec_from_loader(mod_name, loader) module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module loader.exec_module(module) pipelines = [] for attr in module.__dict__.values(): if isinstance(attr, Pipeline): pipelines.append(attr) return pipelines
Then we add a module to read parse all files in a folder and save all the pipelines.
import glob, os import croniter as cron import importlib, sys, time from workflow.model.Pipeline import Pipeline def get_all_pipelines(folder): pipelines = [] files = glob.glob(os.path.join(folder, '*.py')) for filepath in files: pipelines.extend(parse(filepath)) return pipelines def parse(filepath): mod_name = f"{filepath}_{time.time()}" loader = importlib.machinery.SourceFileLoader(mod_name, filepath) spec = importlib.util.spec_from_loader(mod_name, loader) module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module loader.exec_module(module) pipelines = [] for attr in module.__dict__.values(): if isinstance(attr, Pipeline): pipelines.append(attr) return pipelines
Scheduler
Now we can create a scheduler that gets folder and task queue as an input.
We also add a module to get all the pipelines and schedules, and sort them using croniter library.
import croniter as cron class Scheduler: def __init__(self, folder, queue): self.folder = folder self.schedules = [] self.task_queue = queue def get_schedules(self): pipelines = get_all_pipelines(self.folder) self.schedules = [] for p in pipelines: print(p.name, p.schedule) schedule_iter = cron.croniter(p.schedule) self.schedules.append((p, schedule_iter.get_next(), schedule_iter))
Now we can add a run module that keeps checking the pipelines and schedules every minute, and run the pipeline, which puts tasks in the queue.
import croniter as cron class Scheduler: def __init__(self, folder, queue): self.folder = folder self.schedules = [] self.task_queue = queue def get_schedules(self): pipelines = get_all_pipelines(self.folder) self.schedules = [] for p in pipelines: print(p.name, p.schedule) schedule_iter = cron.croniter(p.schedule) self.schedules.append((p, schedule_iter.get_next(), schedule_iter)) def run(self): i = 0 while True: if i % 60 == 0: self.get_schedules() self.schedules.sort(key=lambda x: x[1]) for nextrun in self.schedules[:]: if nextrun[1] <= time.time(): print(time.time(), nextrun[1]) pipeline = nextrun[0] schedule_iter = nextrun[2] pipeline.run(self.task_queue) self.schedules.pop(0) self.schedules.append((pipeline, schedule_iter.get_next(), schedule_iter)) else: break time.sleep(1) i += 1
Comments
Post a Comment