上面我们已经实现了通过 Python 连接到多个远程服务器并执行命令的功能,但是这个功能还有很多地方可以优化,比如我们可以用类来抽象出任务和主机,然后将执行任务的逻辑封装到类中,这样可以更好地管理任务和主机。
下面我们就来优化一下代码,首先我们定义一个 Task
类,用于表示任务,代码如下:
# task.py class Task: def __init__(self, name, command): self.name = name self.command = command self.result = None self.status = "pending" # 'pending', 'success', 'failure', etc. def __repr__(self): return f"Task(name={self.name}, command={self.command}, status={self.status})"
上面代码中我们定义了一个 Task
类,用于表示任务,这个类有一个 name
属性表示任务名称,一个 command
属性表示要执行的命令,一个 result
属性表示任务的结果,一个 status
属性表示任务的状态,初始状态为 pending
。这里我们在任务的基础上增加了 result
和 status
两个属性,用于表示任务的执行结果和状态,这样我们可以更方便的跟踪任务的执行情况。
同样我们再定义一个 Host
类,用于表示主机,其
# host.py class Host: """主机类""" def __init__(self, address, username, password): self.address = address self.username = username self.password = password def __repr__(self): return f"Host(address={self.address}, username={self.username})"
然后我们再来优化一下 execute_remote_command
函数,将其封装到 Host
类中,代码如下:
# host.py import paramiko class Host: """主机类""" def __init__(self, address, username, password): self.address = address self.username = username self.password = password def __repr__(self): return f"Host(address={self.address}, username={self.username})" def execute_command(self, command): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: # 连接到远程服务器 client.connect(self.address, username=self.username, password=self.password) # 执行命令 _, stdout, stderr = client.exec_command(command) # 获取命令退出状态码 exit_status = stdout.channel.recv_exit_status() # 读取输出 result = { "stdout": stdout.read().decode(), "stderr": stderr.read().decode(), "exit_status": exit_status } except Exception as e: result = {"stdout": "", "stderr": str(e), "exit_status": -1} finally: client.close() return result
上面代码我们定义了一个 Host
对象,用于表示主机,然后我们将远程执行命令的逻辑封装到了 execute_command
方法中,这是因为执行命令是针对主机的,所以将其封装到 Host
类中更合适。
上面执行命令的代码中和之前的有一点不同,我们通过 stdout.channel.recv_exit_status()
获取命令的退出状态码,这个状态码可以用于判断命令是否成功执行,退出状态码为 0
表示命令执行成功,非零值表示命令执行失败或出现错误。最后我们将输出、错误和退出状态码封装到一个字典中返回,这样我们就可以更方便的获取命令的执行结果了。
现在任务和主机都有了自己的类,但是我们还需要一个东西能够将任务和主机关联起来,我们可以称其为执行引擎,这个引擎负责任务的调度和执行,下面我们来实现这个引擎。
首先我们定义一个 Engine
类,用于执行任务,代码如下:
# engine.py from host import Host from task import Task from concurrent.futures import ThreadPoolExecutor class ExecutionEngine: """执行引擎""" def __init__(self): self.hosts = [] self.tasks = [] def add_host(self, executor): self.hosts.append(executor) def add_task(self, task): self.tasks.append(task) def run(self): """执行任务""" # 执行单个任务 def _execute_task(host, task): task.result = host.execute_command(task.command) # 退出码为 0 表示成功 task.status = "success" if task.result["exit_status"] == 0 else "failure" return task, host with ThreadPoolExecutor(max_workers=5) as executor: # 为每个主机执行任务 futures = [] for host in self.hosts: print(f"PLAY [{host.address}] {'*' * 50}") for task in self.tasks: futures.append(executor.submit(_execute_task, host, task)) for future in as_completed(futures): task, host = future.result() # 获取 task 和 host try: if task.status == "failure": raise Exception(task.result["stderr"]) except Exception as exc: task.result = {"stdout": "", "stderr": str(exc), "exit_status": -1} task.status = "failure" print(f"{host.address} | {task.name} | {task.result} | {task.status}")
这里我们定义了一个 ExecutionEngine
类,用于执行任务,我们可以通过 add_host
和 add_task
方法来添加主机和任务,然后通过 run
方法来执行任务。 在 run
方法中同样我们使用 ThreadPoolExecutor
来并发执行任务,最后我们通过 as_completed
函数来获取任务的结果。
现在我们已经实现了任务执行引擎,接下来我们可以将上面的代码整合到一起,然后运行,就可以看到输出结果了。
# main.py from config import load_configuration from task import Task from host import Host from engine import ExecutionEngine if __name__ == '__main__': conf = load_configuration('playbook.yaml') engine = ExecutionEngine() # 将主机添加到任务管理器中 for host in conf['hosts']: engine.add_host(Host(host['address'], host['username'], host['password'])) # 将任务添加到任务管理器中 for task in conf['tasks']: manager.add_task(Task(task['name'], task['command'])) # 运行任务引擎 engine.run() print('任务执行完成!!!')
这样我们就使用类的方式来重新优化了代码,使其更加清晰和易于管理,当然这仍然还是一个简单的版本,实际上 Ansible 还有很多功能,比如模块化、插件系统、错误处理、回滚机制等,这些功能可以在这个基础上继续完善。