diff --git a/splinter.py b/splinter.py index 10bb9b2ea8854d4c8bdff2172c3759853420841e..39a055f637e18161a24d8b0855bfeaba38b69f05 100755 --- a/splinter.py +++ b/splinter.py @@ -30,9 +30,9 @@ class task_status: _task = None _future = None - def __init__(self, index, future): + def __init__(self, task, future): self._future = future - self._index = index + self._task = task def future(self): return self._future @@ -53,8 +53,11 @@ class workflow: self._task_array.append(task) def find_next_task(self): - if len(self._pending_task_array) != 0: - return self._pending_task_array[0] + n_pending_task = len(self._pending_task_array) + if n_pending_task != 0: + for i in range(0, n_pending_task): + if self.are_task_dependencies_satisfied(self._pending_task_array[i]): + return self._pending_task_array[i] return None def completed_task_status(self): @@ -72,6 +75,16 @@ class workflow: return False + def are_task_dependencies_satisfied(self, task): + if len(self._completed_task_array) != 0: + completed_task_id_array = [task.task_id() for task in self._completed_task_array] + for dependent_task in task.dependent_task_array(): + if not dependent_task in completed_task_id_array: + return False + + return True + return True + def is_worker_available(self): if len(self._task_status_array) < self._njob: return True @@ -95,7 +108,6 @@ class workflow: completed_task_status = self.completed_task_status() if completed_task_status != None: print("Job Completed") - print(completed_task_status.future().result()) self._completed_task_array.append( completed_task_status.task()) self._task_status_array.remove(completed_task_status) @@ -107,7 +119,7 @@ if __name__ == "__main__": wf = workflow() wf.add_task(task(1, ["./test/tester", "10"], [])) wf.add_task(task(2, ["./test/tester", "1"], [])) - wf.add_task(task(3, ["./test/tester", "5"], [])) - wf.add_task(task(4, ["./test/tester", "1"], [])) + wf.add_task(task(3, ["./test/tester", "5"], [2])) + wf.add_task(task(4, ["./test/tester", "1"], [1])) wf.execute_workflow(2, 1)