前面我们学习了 Python 的一些基础知识,接下来我们将通过一个实战项目来巩固这些知识。
我们将使用 Python 来实现一个简化版本 类似 Ansible 的自动化运维工具,Ansible 是一个用于配置管理、应用程序部署、任务自动化等的工具,主要有以下特点:
- 声明式配置:使用 YAML 文件(Playbooks)定义系统的期望状态。
- 无代理:不需要在被管理的节点上安装代理软件,通过 SSH 连接执行任务。
- 模块化:通过模块化的方式扩展功能。
- 并行执行:同时在多个节点上执行任务。
项目分析
我们这里将来实现任务自动下发的功能,即通过 SSH 连接到多个远程服务器执行命令。根据我们要实现的功能,我们可以将项目分为以下几个模块:
- 核心引擎:负责任务调度、执行和管理。
- 配置管理:定义和存储配置数据。
- 模块管理:实现具体的任务和操作。
- 通信模块:通过 SSH、API 或其他协议与目标主机通信。
我们在编码之前首先要思考下如何来实现这个项目,下面我们可以先列出一些核心功能的实现方法:
任务调度和执行
- 任务队列:使用队列系统(如
queue.Queue
或multiprocessing.Queue
)来管理任务。 - 并行执行:使用多线程或多进程来并行执行任务。例如,使用
concurrent.futures.ThreadPoolExecutor
或multiprocessing.Pool
。
配置文件解析
- YAML 文件:使用 Python 的
PyYAML
库来解析 YAML 配置文件。配置文件定义了任务、目标主机和操作步骤。
模块管理
- 模块化设计:将不同的任务(如安装软件、配置文件、服务管理)封装成模块。每个模块执行特定的操作,并提供标准接口。
- 插件系统:实现插件系统,以便能够动态加载和执行不同的模块。
远程执行
- SSH 连接:我们可以使用
paramiko
库通过 SSH 连接到远程主机并执行命令。 - 无代理架构:确保不需要在远程主机上安装额外的软件,只需通过 SSH 进行操作。
错误处理和回滚
- 异常处理:在任务执行过程中处理异常情况,并记录详细的日志。
- 回滚机制:如果任务失败,能够恢复到任务执行前的状态。
项目实现
首先我们要明确的是任务软件都是先实现一个简单的版本,然后再逐步完善功能,同样我们这里也是这样,先实现一个核心的功能,然后再逐步完善,核心功能就是通过 SSH 连接到远程服务器执行命令,所以接下来我们将实现这个功能。
首先创建一个新的项目目录 myansible
,然后在目录下创建一个名为 myenv
的虚拟环境:
$ mkdir myansible $ cd myansible $ python3 -m venv myenv # 创建虚拟环境 $ source myenv/bin/activate # 激活虚拟环境
记得在 VSCode 中选择该虚拟环境的 Python 解释器。
配置文件解析
然后在项目根目录下面定义一个如下所示的 YAML 配置文件:
# playbook.yaml hosts: - address: host1.example.com username: root password: password - address: host2.example.com username: root password: password tasks: - name: Install nginx command: sudo apt-get install -y nginx - name: Start nginx command: sudo systemctl start nginx
我们希望通过上面的配置文件来实现在 host1.example.com
和 host2.example.com
上安装 nginx
并启动服务,如果还有更多主机或者任务,只需要在配置文件中添加即可,如果我们手动执行这些命令,需要登录到每台主机上执行,这样就会很麻烦,所以我们希望通过 Python 来实现这个功能。
要在程序中解析 YAML 文件,我们可以使用一个 PyYAML
库,可以通过 pip
安装:
(myenv) ➜ pip3 install pyyaml
然后我们就可以在项目根目录下面新建一个名为 config.py
的文件,用于解析 YAML 配置文件,代码如下:
import yaml # 导入 PyYAML 库 # 加载配置文件 def load_configuration(file_path: str): with open(file_path, 'r') as file: return yaml.safe_load(file) # 解析 YAML 文件 if __name__ == '__main__': config = load_configuration('playbook.yaml') print(config)
上面代码中我们定义了一个 load
函数,用于加载 YAML 配置文件。要解析 YAML 文件很简单,只需要使用 yaml.safe_load
函数即可,它会将 YAML 文件解析为 Python 字典对象。
下面我们用 if __name__ == '__main__'
来测试一下,运行代码,这一段的意思是如果直接运行这个文件,就会执行 load
函数,加载配置文件并打印出来,运行结果如下:
(myenv) ➜ python3 config.py {'hosts': [{'address': 'host1.example.com', 'username': 'root', 'password': 'password'}, {'address': 'host2.example.com', 'username': 'root', 'password': 'password'}], 'tasks': [{'name': 'Install nginx', 'command': 'sudo apt-get install -y nginx'}, {'name': 'Start nginx', 'command': 'sudo systemctl start nginx'}]}
可以看到我们成功加载了配置文件,并将其解析为 Python 字典对象,接下来我们就可以使用这些数据来连接远程服务器并执行命令。
远程执行任务
上面我们已经成功加载了配置文件,接下来我们就可以通过 SSH 连接到远程服务器并执行命令。要实现 SSH 连接,我们可以使用 paramiko
这个库来实现,这个库是一个纯 Python 实现的 SSH 客户端,主要用于远程执行命令和文件传输等任务,它是一个功能强大的工具,适用于需要进行 SSH 连接的自动化任务。可以通过 pip
安装这个包:
(myenv) ➜ pip3 install paramiko
下面我们先了解下 paramiko
的基本用法,然后再来实现远程执行任务的功能。
要连接到远程服务器,我们首先需要创建一个 SSHClient
对象,然后使用 connect
方法连接到远程服务器,示例代码如下:
import paramiko # 创建 SSHClient 对象 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 自动添加主机密钥(如果之前未连接过) # 连接到远程服务器 client.connect(hostname='host1.example.com', port=22, username='root', password='password') # 执行命令 stdin, stdout, stderr = client.exec_command('ls -l') # 读取输出 print(stdout.read().decode()) print(stderr.read().decode()) # 关闭连接 client.close()
上面的代码中,我们首先创建了一个 SSHClient
对象,然后使用 connect
方法连接到远程服务器,接着使用 exec_command
方法执行命令,最后读取输出并关闭连接。这个整个过程其实就是一个典型的 SSH 连接过程。
这里需要注意的是 exec_command
方法返回的是一个三元组,分别是标准输入、标准输出和标准错误,我们可以通过这三个对象来读取命令的输入和输出。在读取输出的时候,需要使用 decode
方法将字节流转换为字符串。
将上面的代码保存到一个名为 ssh.py
的文件中,然后运行,可以看到输出结果:
(myenv) ➜ python3 ssh.py total 16M -rw-r--r-- 1 root root 7.1K Feb 22 11:21 Clusterfile -rw-r--r-- 1 root root 96 Mar 9 07:32 config.yaml -rw-r--r-- 1 root root 16M Feb 21 20:34 helm-v3.14.2-linux-amd64.tar.gz -rw-r--r-- 1 root root 1.2K Mar 2 08:25 mysql.yaml
可以看到我们成功连接到远程服务器并执行了 ls -l
命令。
接下来我们可以将上面的代码封装成一个函数,用于执行远程命令,代码如下:
# ssh.py import paramiko # 远程执行命令 def execute_remote_command(host, command, username, password): try: print(f'Execute command: {command} on {host} with { username}/{password}') client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, username=username, password=password) _, stdout, stderr = client.exec_command(command) output = stdout.read().decode() error = stderr.read().decode() client.close() # 关闭连接 return output, error except Exception as e: return None, str(e)
并发执行任务
上面我们已经实现了远程执行任务的功能,接下来我们需要实现并发执行任务的功能,即同时连接到多个远程服务器并执行命令。要实现并发执行任务,我们可以使用多线程或多进程来实现,由于我们这里是通过网络 IO 来实现并发,所以使用多线程更合适。
前面我们学过 Python 提供了一个 concurrent.futures
模块,可以用来实现并发编程,其中有一个 ThreadPoolExecutor
类,可以用来创建一个线程池,方便并发执行任务。我们可以使用这个类来实现并发执行任务的功能。
下面我们来实现一个函数,用于并发执行任务,代码如下:
# main.py from concurrent.futures import ThreadPoolExecutor from config import load_configuration from ssh import execute_remote_command # 并发执行任务 def execute_tasks(hosts, tasks): with ThreadPoolExecutor() as executor: futures = [] for host in hosts: for task in tasks: future = executor.submit(execute_remote_command, host['address'], task['command'], host['username'], host['password']) futures.append(future) for future in futures: output, err = future.result() if err: print(f'Error: {err}') else: print(f'Output: {output}') if __name__ == '__main__': conf = load_configuration('playbook.yaml') execute_tasks(conf['hosts'], conf['tasks'])
上面代码中,我们定义了一个 execute_tasks
函数,用于并发执行任务,首先创建一个 ThreadPoolExecutor
对象,然后使用 submit
方法提交任务,最后使用 result
方法获取任务的结果。
整体比较简单,最后我们加载配置文件并调用 execute_tasks
函数即可,运行代码,就可以看到输出结果了。