Skip to content
Snippets Groups Projects
Unverified Commit 13475b9e authored by Jayesh Badwaik's avatar Jayesh Badwaik
Browse files

- dependencies now work

parent 728384be
Branches
Tags v0.4
No related merge requests found
...@@ -30,9 +30,9 @@ class task_status: ...@@ -30,9 +30,9 @@ class task_status:
_task = None _task = None
_future = None _future = None
def __init__(self, index, future): def __init__(self, task, future):
self._future = future self._future = future
self._index = index self._task = task
def future(self): def future(self):
return self._future return self._future
...@@ -53,8 +53,11 @@ class workflow: ...@@ -53,8 +53,11 @@ class workflow:
self._task_array.append(task) self._task_array.append(task)
def find_next_task(self): def find_next_task(self):
if len(self._pending_task_array) != 0: n_pending_task = len(self._pending_task_array)
return self._pending_task_array[0] if n_pending_task != 0:
for i in range(0, n_pending_task):
if self.are_task_dependencies_satisfied(self._pending_task_array[i]):
return self._pending_task_array[i]
return None return None
def completed_task_status(self): def completed_task_status(self):
...@@ -72,6 +75,16 @@ class workflow: ...@@ -72,6 +75,16 @@ class workflow:
return False return False
def are_task_dependencies_satisfied(self, task):
if len(self._completed_task_array) != 0:
completed_task_id_array = [task.task_id() for task in self._completed_task_array]
for dependent_task in task.dependent_task_array():
if not dependent_task in completed_task_id_array:
return False
return True
return True
def is_worker_available(self): def is_worker_available(self):
if len(self._task_status_array) < self._njob: if len(self._task_status_array) < self._njob:
return True return True
...@@ -95,7 +108,6 @@ class workflow: ...@@ -95,7 +108,6 @@ class workflow:
completed_task_status = self.completed_task_status() completed_task_status = self.completed_task_status()
if completed_task_status != None: if completed_task_status != None:
print("Job Completed") print("Job Completed")
print(completed_task_status.future().result())
self._completed_task_array.append( self._completed_task_array.append(
completed_task_status.task()) completed_task_status.task())
self._task_status_array.remove(completed_task_status) self._task_status_array.remove(completed_task_status)
...@@ -107,7 +119,7 @@ if __name__ == "__main__": ...@@ -107,7 +119,7 @@ if __name__ == "__main__":
wf = workflow() wf = workflow()
wf.add_task(task(1, ["./test/tester", "10"], [])) wf.add_task(task(1, ["./test/tester", "10"], []))
wf.add_task(task(2, ["./test/tester", "1"], [])) wf.add_task(task(2, ["./test/tester", "1"], []))
wf.add_task(task(3, ["./test/tester", "5"], [])) wf.add_task(task(3, ["./test/tester", "5"], [2]))
wf.add_task(task(4, ["./test/tester", "1"], [])) wf.add_task(task(4, ["./test/tester", "1"], [1]))
wf.execute_workflow(2, 1) wf.execute_workflow(2, 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment