logo

项目分析与简单实现(实战)

前面我们学习了 Python 的一些基础知识,接下来我们将通过一个实战项目来巩固这些知识。

我们将使用 Python 来实现一个简化版本 类似 Ansible 的自动化运维工具,Ansible 是一个用于配置管理、应用程序部署、任务自动化等的工具,主要有以下特点:

  • 声明式配置:使用 YAML 文件(Playbooks)定义系统的期望状态。
  • 无代理:不需要在被管理的节点上安装代理软件,通过 SSH 连接执行任务。
  • 模块化:通过模块化的方式扩展功能。
  • 并行执行:同时在多个节点上执行任务。

项目分析

我们这里将来实现任务自动下发的功能,即通过 SSH 连接到多个远程服务器执行命令。根据我们要实现的功能,我们可以将项目分为以下几个模块:

  • 核心引擎:负责任务调度、执行和管理。
  • 配置管理:定义和存储配置数据。
  • 模块管理:实现具体的任务和操作。
  • 通信模块:通过 SSH、API 或其他协议与目标主机通信。

我们在编码之前首先要思考下如何来实现这个项目,下面我们可以先列出一些核心功能的实现方法:

任务调度和执行

  • 任务队列:使用队列系统(如 queue.Queuemultiprocessing.Queue)来管理任务。
  • 并行执行:使用多线程或多进程来并行执行任务。例如,使用 concurrent.futures.ThreadPoolExecutormultiprocessing.Pool

配置文件解析

  • YAML 文件:使用 Python 的 PyYAML 库来解析 YAML 配置文件。配置文件定义了任务、目标主机和操作步骤。

模块管理

  • 模块化设计:将不同的任务(如安装软件、配置文件、服务管理)封装成模块。每个模块执行特定的操作,并提供标准接口。
  • 插件系统:实现插件系统,以便能够动态加载和执行不同的模块。

远程执行

  • SSH 连接:我们可以使用 paramiko 库通过 SSH 连接到远程主机并执行命令。
  • 无代理架构:确保不需要在远程主机上安装额外的软件,只需通过 SSH 进行操作。

错误处理和回滚

  • 异常处理:在任务执行过程中处理异常情况,并记录详细的日志。
  • 回滚机制:如果任务失败,能够恢复到任务执行前的状态。

项目实现

首先我们要明确的是任务软件都是先实现一个简单的版本,然后再逐步完善功能,同样我们这里也是这样,先实现一个核心的功能,然后再逐步完善,核心功能就是通过 SSH 连接到远程服务器执行命令,所以接下来我们将实现这个功能。

首先创建一个新的项目目录 myansible,然后在目录下创建一个名为 myenv 的虚拟环境:

$ mkdir myansible $ cd myansible $ python3 -m venv myenv # 创建虚拟环境 $ source myenv/bin/activate # 激活虚拟环境

记得在 VSCode 中选择该虚拟环境的 Python 解释器。

配置文件解析

然后在项目根目录下面定义一个如下所示的 YAML 配置文件:

# playbook.yaml hosts: - address: host1.example.com username: root password: password - address: host2.example.com username: root password: password tasks: - name: Install nginx command: sudo apt-get install -y nginx - name: Start nginx command: sudo systemctl start nginx

我们希望通过上面的配置文件来实现在 host1.example.comhost2.example.com 上安装 nginx 并启动服务,如果还有更多主机或者任务,只需要在配置文件中添加即可,如果我们手动执行这些命令,需要登录到每台主机上执行,这样就会很麻烦,所以我们希望通过 Python 来实现这个功能。

要在程序中解析 YAML 文件,我们可以使用一个 PyYAML 库,可以通过 pip 安装:

(myenv) ➜ pip3 install pyyaml

然后我们就可以在项目根目录下面新建一个名为 config.py 的文件,用于解析 YAML 配置文件,代码如下:

import yaml # 导入 PyYAML 库 # 加载配置文件 def load_configuration(file_path: str): with open(file_path, 'r') as file: return yaml.safe_load(file) # 解析 YAML 文件 if __name__ == '__main__': config = load_configuration('playbook.yaml') print(config)

上面代码中我们定义了一个 load 函数,用于加载 YAML 配置文件。要解析 YAML 文件很简单,只需要使用 yaml.safe_load 函数即可,它会将 YAML 文件解析为 Python 字典对象。

下面我们用 if __name__ == '__main__' 来测试一下,运行代码,这一段的意思是如果直接运行这个文件,就会执行 load 函数,加载配置文件并打印出来,运行结果如下:

(myenv) ➜ python3 config.py {'hosts': [{'address': 'host1.example.com', 'username': 'root', 'password': 'password'}, {'address': 'host2.example.com', 'username': 'root', 'password': 'password'}], 'tasks': [{'name': 'Install nginx', 'command': 'sudo apt-get install -y nginx'}, {'name': 'Start nginx', 'command': 'sudo systemctl start nginx'}]}

可以看到我们成功加载了配置文件,并将其解析为 Python 字典对象,接下来我们就可以使用这些数据来连接远程服务器并执行命令。

远程执行任务

上面我们已经成功加载了配置文件,接下来我们就可以通过 SSH 连接到远程服务器并执行命令。要实现 SSH 连接,我们可以使用 paramiko 这个库来实现,这个库是一个纯 Python 实现的 SSH 客户端,主要用于远程执行命令和文件传输等任务,它是一个功能强大的工具,适用于需要进行 SSH 连接的自动化任务。可以通过 pip 安装这个包:

(myenv) ➜ pip3 install paramiko

下面我们先了解下 paramiko 的基本用法,然后再来实现远程执行任务的功能。

要连接到远程服务器,我们首先需要创建一个 SSHClient 对象,然后使用 connect 方法连接到远程服务器,示例代码如下:

import paramiko # 创建 SSHClient 对象 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 自动添加主机密钥(如果之前未连接过) # 连接到远程服务器 client.connect(hostname='host1.example.com', port=22, username='root', password='password') # 执行命令 stdin, stdout, stderr = client.exec_command('ls -l') # 读取输出 print(stdout.read().decode()) print(stderr.read().decode()) # 关闭连接 client.close()

上面的代码中,我们首先创建了一个 SSHClient 对象,然后使用 connect 方法连接到远程服务器,接着使用 exec_command 方法执行命令,最后读取输出并关闭连接。这个整个过程其实就是一个典型的 SSH 连接过程。

这里需要注意的是 exec_command 方法返回的是一个三元组,分别是标准输入、标准输出和标准错误,我们可以通过这三个对象来读取命令的输入和输出。在读取输出的时候,需要使用 decode 方法将字节流转换为字符串。

将上面的代码保存到一个名为 ssh.py 的文件中,然后运行,可以看到输出结果:

(myenv) ➜ python3 ssh.py total 16M -rw-r--r-- 1 root root 7.1K Feb 22 11:21 Clusterfile -rw-r--r-- 1 root root 96 Mar 9 07:32 config.yaml -rw-r--r-- 1 root root 16M Feb 21 20:34 helm-v3.14.2-linux-amd64.tar.gz -rw-r--r-- 1 root root 1.2K Mar 2 08:25 mysql.yaml

可以看到我们成功连接到远程服务器并执行了 ls -l 命令。

接下来我们可以将上面的代码封装成一个函数,用于执行远程命令,代码如下:

# ssh.py import paramiko # 远程执行命令 def execute_remote_command(host, command, username, password): try: print(f'Execute command: {command} on {host} with { username}/{password}') client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, username=username, password=password) _, stdout, stderr = client.exec_command(command) output = stdout.read().decode() error = stderr.read().decode() client.close() # 关闭连接 return output, error except Exception as e: return None, str(e)

并发执行任务

上面我们已经实现了远程执行任务的功能,接下来我们需要实现并发执行任务的功能,即同时连接到多个远程服务器并执行命令。要实现并发执行任务,我们可以使用多线程或多进程来实现,由于我们这里是通过网络 IO 来实现并发,所以使用多线程更合适。

前面我们学过 Python 提供了一个 concurrent.futures 模块,可以用来实现并发编程,其中有一个 ThreadPoolExecutor 类,可以用来创建一个线程池,方便并发执行任务。我们可以使用这个类来实现并发执行任务的功能。

下面我们来实现一个函数,用于并发执行任务,代码如下:

# main.py from concurrent.futures import ThreadPoolExecutor from config import load_configuration from ssh import execute_remote_command # 并发执行任务 def execute_tasks(hosts, tasks): with ThreadPoolExecutor() as executor: futures = [] for host in hosts: for task in tasks: future = executor.submit(execute_remote_command, host['address'], task['command'], host['username'], host['password']) futures.append(future) for future in futures: output, err = future.result() if err: print(f'Error: {err}') else: print(f'Output: {output}') if __name__ == '__main__': conf = load_configuration('playbook.yaml') execute_tasks(conf['hosts'], conf['tasks'])

上面代码中,我们定义了一个 execute_tasks 函数,用于并发执行任务,首先创建一个 ThreadPoolExecutor 对象,然后使用 submit 方法提交任务,最后使用 result 方法获取任务的结果。

整体比较简单,最后我们加载配置文件并调用 execute_tasks 函数即可,运行代码,就可以看到输出结果了。