logo

该视频仅会员有权观看

立即开通课程「Python 入门」权限。

¥
199
/ 年

任务依赖关系(实战)

上面我们已经实现了一个简单的任务执行引擎,但是这个引擎还有一个问题,就是任务之间没有依赖关系,即任务之间是并行执行的,有时候我们希望任务之间有依赖关系,比如任务 A 执行完后再执行任务 B,这时我们就需要实现任务之间的依赖关系了。

要实现任务之间的依赖关系,我们可以通过拓扑排序来实现,拓扑排序是一种用于有向无环图(DAG, Directed Acyclic Graph) 的排序算法。它的目标是为图中的顶点生成一个线性顺序,使得对于图中的每一条有向边 u -> v,顶点 u 都排在顶点 v 的前面。简单来说,就是在处理依赖关系时,确保先处理所有依赖项,再处理依赖这些项的元素。

比如我们有一个简单的任务依赖关系图,如下所示:

Task A | v Task B --> Task C | | v v Task D --> Task E

在这个例子中:

  • Task A 是所有任务的起点,必须最先执行。
  • Task B 依赖于 Task A,即 Task B 必须在 Task A 之后执行。
  • Task D 依赖于 Task B。
  • Task C 也依赖于 Task B。
  • Task E 依赖于 Task D 和 Task C。

要实现拓扑排序,可以按照以下几个步骤:

  1. 找出没有依赖的节点:这些节点可以作为任务的起点,因为它们不依赖于其他任务。
  2. 将这些节点移除:从图中移除这些节点,并更新剩下的节点的依赖关系。
  3. 重复步骤 1 和 2:继续找出新的没有依赖的节点,直到所有节点都被移除。这个过程中生成的节点顺序就是拓扑排序的结果。

比如上面的任务图,我们就可以按照以下步骤进行拓扑排序:

  • 开始:找出没有依赖的任务,首先是 Task A。
  • 移除 Task A,剩下的任务是 Task B, C, D, E。
  • 找出 Task B(现在它没有依赖)并移除。
  • 找出 Task C 和 Task D,它们现在都没有依赖,可以同时移除。
  • 最后剩下 Task E,移除。

所以最后生成的拓扑排序序列可能是:

Task A -> Task B -> Task C -> Task D -> Task E

或者

Task A -> Task B -> Task D -> Task C -> Task E

要在 Python 中实现拓扑排序有很多方法,最简单的就是使用 graphlib.TopologicalSorter 类,这个类是 Python 3.9 新增的一个类,用于处理带有依赖关系的任务,通过拓扑排序来确定任务的执行顺序。

要使用这个类,首先需要 Python 3.9 以上版本,然后就可以使用这个类来实现拓扑排序,代码如下:

from graphlib import TopologicalSorter # 定义任务和依赖关系 sorter = TopologicalSorter() sorter.add('B', 'A') # B 依赖于 A sorter.add('D', 'B') # D 依赖于 B sorter.add('C', 'B') # C 依赖于 B sorter.add('E', 'C', 'D') # E 依赖于 C 和 D sorter.add('A') # A 没有依赖 # 执行拓扑排序 sorter.prepare() # 准备排序 while sorter.is_active(): # 判断是否还有任务 for node in sorter.get_ready(): # 获取准备好的任务 print(node) # 输出排序结果 sorter.done(node) # 标记任务完成

运行上述代码,可能的输出顺序为:

A B C E D

拓扑排序非常适合用于任务调度、构建系统、依赖解析等场景。它确保任务按依赖关系正确排序,从而避免了死锁和循环依赖等问题。

了解了拓扑排序的原理后,我们就可以实现任务之间的依赖关系了,比如现在的 playbook.yaml 文件如下:

# playbook.yaml tasks: - name: List all files in the home directory command: ls -l - name: Create a new directory command: mkdir /tmp/ansible depends_on: - List all files in the home directory - name: Remove the directory command: rm -rf /tmp/ansible depends_on: - List all files in the home directory - Create a new directory

在这里我们在任务中增加了 depends_on 字段,用于表示任务的依赖关系,这样我们就可以根据这个字段来实现任务之间的依赖关系。

首先我们需要修改 Task 类,增加一个 depends_on 字段,用于表示任务的依赖关系,代码如下:

# task.py class Task: def __init__(self, name: str, command: str, depends_on: list[str] = []): self.name = name self.command = command self.result: dict[str, str | int] = {} # 任务执行结果 self.status = "pending" # 任务状态,pending, success, failed.... self.depends_on = depends_on # 依赖的任务 def __repr__(self): """返回任务的字符串表示""" return f"Task({self.name!r}, {self.command!r}, status={self.status!r}, depends_on={self.depends_on!r})"

接下来我们需要重构下 ExecutionEngine 类,使其支持任务之间的依赖关系,代码如下:

# engine.py from concurrent.futures import ThreadPoolExecutor, Future from graphlib import TopologicalSorter from typing import Any from host import Host from task import Task class ExecutionEngine: """任务执行引擎""" def __init__(self): self.hosts: list[Host] = [] self.tasks: dict[str, Task] = {} def add_host(self, host: Host): """添加主机""" self.hosts.append(host) def add_task(self, task: Task): """添加任务""" self.tasks[task.name] = task def _calculate_max_width(self): task_headers = [f"TASK [{task.name}]" for task in self.tasks.values()] play_headers = [f"PLAY [{host.address}]" for host in self.hosts] all_headers = task_headers + play_headers max_header_length = max(map(len, all_headers)) return max_header_length + 50 def format_output(self, host: Host, task: Task, max_width: int): """格式化输出""" status = "ok" if task.status == "success" else "failed" header_length = max_width - len(f"TASK [{task.name}]") header = f"TASK [{task.name}] {'*' * header_length}" result = f"{status}: [{host.address}]\n" if task.result["stdout"]: result += f"stdout:\n{task.result['stdout']}" if task.result["stderr"]: result += f"stderr:\n{task.result['stderr']}" return f"{header}\n{result}" def play_recap(self, host: Host, max_width: int): """输出任务执行的统计结果""" recap = { "ok": 0, "failed": 0, "changed": 0, "unreachable": 0, "skipped": 0, "rescued": 0, "ignored": 0 } for task in self.tasks.values(): if task.status == "success": recap["ok"] += 1 elif task.status == "failed": recap["failed"] += 1 recap_header_length = max_width - len(f"PLAY RECAP") print(f"PLAY RECAP {'*' * recap_header_length}") print(f"{host.address} : ok={ recap['ok']} failed={recap['failed']} skipped={recap['skipped']} rescued={recap['rescued']} ignored={recap['ignored']}\n\n") def _execute_task(self, host: Host, task: Task): """执行单个任务""" task.result = host.execute_command(task.command) # 执行任务 task.status = "success" if task.result[ "exit_status"] == 0 else "failed" # 设置任务状态 return host, task def run(self): """执行任务""" max_width = self._calculate_max_width() # 计算最大宽度 with ThreadPoolExecutor() as executor: for host in self.hosts: play_header_length = max_width - len(f"PLAY [{host.address}]") print(f"PLAY [{host.address}] {'*' * play_header_length}\n") # 不能直接循环self.tasks,因为这样所有的任务都并发执行了,就没有依赖关系了 # 创建 TopologicalSorter 对象 sorter: TopologicalSorter[str] = TopologicalSorter() # 添加任务和依赖关系 for task in self.tasks.values(): sorter.add(task.name, *task.depends_on) # 准备拓扑排序 sorter.prepare() # 任务还没有执行完成,就继续执行 while sorter.is_active(): # 获取可以执行的任务列表 ready_tasks = sorter.get_ready() # 执行任务 futures: list[Future[Any]] = [] for task_name in ready_tasks: future = executor.submit( self._execute_task, host, self.tasks[task_name]) futures.append(future) for future in futures: # 等待任务执行完成 host, task = future.result() # 输出任务执行结果 print(self.format_output(host, task, max_width)) for task_name in ready_tasks: # 标记任务已经执行完成 sorter.done(task_name) self.play_recap(host, max_width)

上面代码中我们将执行引擎中的 tasks 列表更改成了 dict 类型,这样我们可以通过任务的名称来获取任务,然后我们在 run 方法中创建了一个 TopologicalSorter 对象,然后将任务及其依赖关系添加到这个对象中,最后通过 get_ready 方法获取可以执行的任务列表,然后并发执行这些任务。

最后的完整输出结果如下:

PLAY [master] ******************************************************************************** TASK [List all files in the home directory] ************************************************** ok: [master] stdout: total 15928 -rw-r--r-- 1 root root 7214 Feb 22 11:21 Clusterfile -rw-r--r-- 1 root root 96 Mar 9 07:32 config.yaml -rw-r--r-- 1 root root 1466 Mar 17 06:17 cr-demo.yaml -rw-r--r-- 1 root root 186 Mar 17 06:18 crd-demo.yaml -rw-r--r-- 1 root root 16132298 Feb 21 2024 helm-v3.14.2-linux-amd64.tar.gz -rw-r--r-- 1 root root 1205 Mar 2 08:25 mysql.yaml drwxr-xr-x 10 root root 4096 Mar 2 03:40 nfs-subdir-external-provisioner -rw-r--r-- 1 root root 257 Mar 2 08:07 pdb.yaml -rw-r--r-- 1 root root 21 Aug 14 08:16 remote_file.txt -rw-r--r-- 1 root root 398 Mar 2 07:10 wd1-svc.yaml -rw-r--r-- 1 root root 1641 Mar 2 07:03 wd1.yaml -rw-r--r-- 1 root root 122850 Mar 9 06:25 wget-log -rw-r--r-- 1 root root 2360 Mar 2 08:22 wordpress-v2.yaml -rw-r--r-- 1 root root 1476 Mar 2 07:39 wordpress.yaml TASK [Create a new directory] **************************************************************** ok: [master] TASK [Remove the directory] ****************************************************************** ok: [master] PLAY RECAP *********************************************************************************** master : ok=3 failed=0 skipped=0 rescued=0 ignored=0 PLAY [node1] ********************************************************************************* TASK [List all files in the home directory] ************************************************** ok: [node1] stdout: total 0 TASK [Create a new directory] **************************************************************** ok: [node1] TASK [Remove the directory] ****************************************************************** ok: [node1] PLAY RECAP *********************************************************************************** node1 : ok=3 failed=0 skipped=0 rescued=0 ignored=0 All tasks have been executed.