《详解Python通过paramiko模块批量执行SSH命令》
在运维自动化场景中,通过SSH协议批量管理多台服务器是常见需求。Python的paramiko模块提供了SSHv2协议的纯Python实现,能够安全高效地执行远程命令、传输文件等操作。本文将系统讲解如何使用paramiko实现批量SSH命令执行,涵盖基础用法、异常处理、性能优化及完整案例。
一、paramiko模块基础
paramiko是Python实现的SSH协议库,包含SSHClient和SFTPClient两个核心类。SSHClient用于执行远程命令,SFTPClient用于文件传输。安装命令如下:
pip install paramiko
1.1 基本SSH连接
建立SSH连接需要主机、端口、用户名和密码(或密钥)。以下是最简单的连接示例:
import paramiko
def basic_ssh_connection():
# 创建SSH客户端实例
ssh = paramiko.SSHClient()
# 自动添加主机密钥(生产环境应使用known_hosts)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 连接服务器
ssh.connect(hostname='192.168.1.100',
port=22,
username='root',
password='your_password')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls -l')
# 获取命令输出
print(stdout.read().decode())
except Exception as e:
print(f"SSH连接失败: {str(e)}")
finally:
# 关闭连接
ssh.close()
1.2 密钥认证方式
相比密码认证,密钥认证更安全。需提前将公钥部署到服务器的~/.ssh/authorized_keys文件中:
def key_auth_connection():
private_key = paramiko.RSAKey.from_private_key_file('/path/to/private_key')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname='192.168.1.100',
port=22,
username='root',
pkey=private_key)
stdin, stdout, stderr = ssh.exec_command('free -m')
print(stdout.read().decode())
finally:
ssh.close()
二、批量执行SSH命令实现
实际运维中需要同时管理多台服务器,可通过以下方式实现批量操作。
2.1 服务器列表配置
使用JSON或YAML格式存储服务器信息:
# servers.json
[
{
"hostname": "server1",
"ip": "192.168.1.100",
"port": 22,
"username": "root",
"password": "pass123"
},
{
"hostname": "server2",
"ip": "192.168.1.101",
"port": 22,
"username": "admin",
"key_path": "/path/to/key"
}
]
2.2 批量执行函数
封装通用执行函数,支持密码和密钥两种认证方式:
import json
import paramiko
from typing import List, Dict, Optional
def load_servers(file_path: str) -> List[Dict]:
with open(file_path, 'r') as f:
return json.load(f)
def execute_remote_command(
host_info: Dict,
command: str,
timeout: int = 10
) -> Optional[str]:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 根据配置选择认证方式
if 'key_path' in host_info:
private_key = paramiko.RSAKey.from_private_key_file(host_info['key_path'])
ssh.connect(
hostname=host_info['ip'],
port=host_info.get('port', 22),
username=host_info['username'],
pkey=private_key,
timeout=timeout
)
else:
ssh.connect(
hostname=host_info['ip'],
port=host_info.get('port', 22),
username=host_info['username'],
password=host_info['password'],
timeout=timeout
)
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read().decode()
error = stderr.read().decode()
if error:
print(f"{host_info['hostname']} 执行错误: {error}")
return None
return result
except Exception as e:
print(f"{host_info['hostname']} 连接失败: {str(e)}")
return None
finally:
ssh.close()
2.3 完整批量执行示例
将上述功能整合为完整脚本:
def batch_execute(servers_file: str, command: str):
servers = load_servers(servers_file)
results = {}
for server in servers:
hostname = server.get('hostname', 'unknown')
print(f"正在执行 {hostname}...")
output = execute_remote_command(server, command)
if output is not None:
results[hostname] = output
# 打印所有结果
for hostname, output in results.items():
print(f"\n=== {hostname} 执行结果 ===")
print(output)
if __name__ == "__main__":
batch_execute('servers.json', 'df -h')
三、高级功能实现
3.1 多线程加速执行
使用concurrent.futures实现并发执行:
from concurrent.futures import ThreadPoolExecutor
def concurrent_execute(servers_file: str, command: str, max_workers: int = 5):
servers = load_servers(servers_file)
results = {}
def process_server(server):
hostname = server.get('hostname', 'unknown')
output = execute_remote_command(server, command)
if output is not None:
results[hostname] = output
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_server, server) for server in servers]
for future in futures:
future.result() # 等待所有任务完成
# 打印结果...(同上)
3.2 交互式命令执行
对于需要交互的命令(如sudo提权),需通过invoke_shell()实现:
def interactive_command(host_info: Dict, commands: List[str]):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(**{k:v for k,v in host_info.items() if k != 'hostname'})
# 创建交互式shell
shell = ssh.invoke_shell()
for cmd in commands:
shell.send(cmd + '\n')
time.sleep(0.5) # 等待命令执行
# 获取所有输出
output = ""
while shell.recv_ready():
output += shell.recv(65535).decode()
print(output)
finally:
ssh.close()
3.3 命令执行超时控制
通过select模块实现非阻塞读取和超时控制:
import select
def execute_with_timeout(ssh, command: str, timeout: int = 10) -> Optional[str]:
try:
stdin, stdout, stderr = ssh.exec_command(command)
# 设置通道超时
transport = ssh.get_transport()
if transport:
transport.set_keepalive(30)
# 读取输出(带超时)
max_timeout = timeout
start_time = time.time()
output = ""
while time.time() - start_time = max_timeout:
print("命令执行超时")
return None
return output
except Exception as e:
print(f"执行异常: {str(e)}")
return None
四、完整案例:批量部署应用
以下是一个完整的批量部署案例,包含文件传输和命令执行:
import os
import time
from typing import List, Dict
class BatchDeployer:
def __init__(self, servers_file: str):
self.servers = self._load_servers(servers_file)
def _load_servers(self, file_path: str) -> List[Dict]:
with open(file_path, 'r') as f:
return json.load(f)
def _get_ssh_client(self, host_info: Dict) -> paramiko.SSHClient:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if 'key_path' in host_info:
key = paramiko.RSAKey.from_private_key_file(host_info['key_path'])
client.connect(
hostname=host_info['ip'],
port=host_info.get('port', 22),
username=host_info['username'],
pkey=key
)
else:
client.connect(
hostname=host_info['ip'],
port=host_info.get('port', 22),
username=host_info['username'],
password=host_info['password']
)
return client
def upload_file(self, host_info: Dict, local_path: str, remote_path: str) -> bool:
try:
ssh = self._get_ssh_client(host_info)
sftp = ssh.open_sftp()
# 确保远程目录存在
remote_dir = os.path.dirname(remote_path)
try:
sftp.chdir(remote_dir)
except IOError:
sftp.mkdir(remote_dir)
sftp.chdir(remote_dir)
sftp.put(local_path, remote_path)
sftp.close()
ssh.close()
return True
except Exception as e:
print(f"文件传输失败: {str(e)}")
return False
def execute_commands(self, host_info: Dict, commands: List[str]) -> Optional[str]:
ssh = self._get_ssh_client(host_info)
output = ""
try:
for cmd in commands:
stdin, stdout, stderr = ssh.exec_command(cmd)
output += f"\n执行: {cmd}\n"
output += stdout.read().decode()
error = stderr.read().decode()
if error:
output += f"\n错误: {error}"
finally:
ssh.close()
return output if output.strip() else None
def deploy_app(self, app_name: str, file_path: str, commands: List[str]):
results = {}
for server in self.servers:
hostname = server.get('hostname', 'unknown')
print(f"\n=== 开始部署 {hostname} ===")
# 1. 上传文件
remote_path = f"/tmp/{app_name}.tar.gz"
if not self.upload_file(server, file_path, remote_path):
results[hostname] = "文件上传失败"
continue
# 2. 执行部署命令
cmd_output = self.execute_commands(server, commands)
results[hostname] = cmd_output or "部署完成但无输出"
# 打印结果
for hostname, result in results.items():
print(f"\n{hostname} 部署结果:")
print(result)
# 使用示例
if __name__ == "__main__":
deployer = BatchDeployer('servers.json')
# 部署参数
app_name = "myapp"
local_file = "./myapp.tar.gz"
deploy_commands = [
f"tar -xzf /tmp/{app_name}.tar.gz -C /opt/",
"systemctl restart myapp",
"systemctl status myapp"
]
deployer.deploy_app(app_name, local_file, deploy_commands)
五、最佳实践与注意事项
5.1 安全建议
1. 禁用AutoAddPolicy,使用known_hosts文件验证主机密钥
2. 密钥文件权限设置为600
3. 避免在代码中硬编码密码,使用环境变量或配置文件
5.2 性能优化
1. 保持SSH连接复用(使用SSHClient实例缓存)
2. 合理设置并发线程数(通常为CPU核心数*2)
3. 对大文件传输使用分块读取
5.3 错误处理
1. 捕获paramiko.SSHException及其子类
2. 处理网络超时和连接中断
3. 记录详细的错误日志
六、总结
本文详细介绍了paramiko模块在批量SSH命令执行中的应用,从基础连接建立到高级并发控制,涵盖了生产环境中的常见需求。通过合理设计,paramiko可以成为强大的自动化运维工具,显著提升管理效率。
关键词:Python、paramiko、SSH、批量执行、自动化运维、密钥认证、多线程、文件传输
简介:本文系统讲解了使用Python paramiko模块实现批量SSH命令执行的方法,包含基础连接、密钥认证、多线程并发、交互式命令执行等高级功能,并提供完整的部署案例和最佳实践建议。