位置: 文档库 > Python > 文档下载预览

《详解python通过paramiko模块批量执行ssh命令.doc》

1. 下载的文档为doc格式,下载后可用word或者wps进行编辑;

2. 将本文以doc文档格式下载到电脑,方便收藏和打印;

3. 下载后的文档,内容与下面显示的完全一致,下载之前请确认下面内容是否您想要的,是否完整.

点击下载文档

详解python通过paramiko模块批量执行ssh命令.doc

《详解Python通过paramiko模块批量执行SSH命令》

在运维自动化场景中,通过SSH协议批量管理多台服务器是常见需求。Python的paramiko模块提供了SSHv2协议的纯Python实现,能够安全高效地执行远程命令、传输文件等操作。本文将系统讲解如何使用paramiko实现批量SSH命令执行,涵盖基础用法、异常处理、性能优化及完整案例。

一、paramiko模块基础

paramiko是Python实现的SSH协议库,包含SSHClient和SFTPClient两个核心类。SSHClient用于执行远程命令,SFTPClient用于文件传输。安装命令如下:

pip install paramiko

1.1 基本SSH连接

建立SSH连接需要主机、端口、用户名和密码(或密钥)。以下是最简单的连接示例:

import paramiko

def basic_ssh_connection():
    # 创建SSH客户端实例
    ssh = paramiko.SSHClient()
    
    # 自动添加主机密钥(生产环境应使用known_hosts)
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        # 连接服务器
        ssh.connect(hostname='192.168.1.100',
                   port=22,
                   username='root',
                   password='your_password')
        
        # 执行命令
        stdin, stdout, stderr = ssh.exec_command('ls -l')
        
        # 获取命令输出
        print(stdout.read().decode())
        
    except Exception as e:
        print(f"SSH连接失败: {str(e)}")
    finally:
        # 关闭连接
        ssh.close()

1.2 密钥认证方式

相比密码认证,密钥认证更安全。需提前将公钥部署到服务器的~/.ssh/authorized_keys文件中:

def key_auth_connection():
    private_key = paramiko.RSAKey.from_private_key_file('/path/to/private_key')
    
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        ssh.connect(hostname='192.168.1.100',
                   port=22,
                   username='root',
                   pkey=private_key)
        
        stdin, stdout, stderr = ssh.exec_command('free -m')
        print(stdout.read().decode())
        
    finally:
        ssh.close()

二、批量执行SSH命令实现

实际运维中需要同时管理多台服务器,可通过以下方式实现批量操作。

2.1 服务器列表配置

使用JSON或YAML格式存储服务器信息:

# servers.json
[
    {
        "hostname": "server1",
        "ip": "192.168.1.100",
        "port": 22,
        "username": "root",
        "password": "pass123"
    },
    {
        "hostname": "server2",
        "ip": "192.168.1.101",
        "port": 22,
        "username": "admin",
        "key_path": "/path/to/key"
    }
]

2.2 批量执行函数

封装通用执行函数,支持密码和密钥两种认证方式:

import json
import paramiko
from typing import List, Dict, Optional

def load_servers(file_path: str) -> List[Dict]:
    with open(file_path, 'r') as f:
        return json.load(f)

def execute_remote_command(
    host_info: Dict,
    command: str,
    timeout: int = 10
) -> Optional[str]:
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        # 根据配置选择认证方式
        if 'key_path' in host_info:
            private_key = paramiko.RSAKey.from_private_key_file(host_info['key_path'])
            ssh.connect(
                hostname=host_info['ip'],
                port=host_info.get('port', 22),
                username=host_info['username'],
                pkey=private_key,
                timeout=timeout
            )
        else:
            ssh.connect(
                hostname=host_info['ip'],
                port=host_info.get('port', 22),
                username=host_info['username'],
                password=host_info['password'],
                timeout=timeout
            )
        
        stdin, stdout, stderr = ssh.exec_command(command)
        result = stdout.read().decode()
        error = stderr.read().decode()
        
        if error:
            print(f"{host_info['hostname']} 执行错误: {error}")
            return None
            
        return result
        
    except Exception as e:
        print(f"{host_info['hostname']} 连接失败: {str(e)}")
        return None
    finally:
        ssh.close()

2.3 完整批量执行示例

将上述功能整合为完整脚本:

def batch_execute(servers_file: str, command: str):
    servers = load_servers(servers_file)
    results = {}
    
    for server in servers:
        hostname = server.get('hostname', 'unknown')
        print(f"正在执行 {hostname}...")
        
        output = execute_remote_command(server, command)
        if output is not None:
            results[hostname] = output
    
    # 打印所有结果
    for hostname, output in results.items():
        print(f"\n=== {hostname} 执行结果 ===")
        print(output)

if __name__ == "__main__":
    batch_execute('servers.json', 'df -h')

三、高级功能实现

3.1 多线程加速执行

使用concurrent.futures实现并发执行:

from concurrent.futures import ThreadPoolExecutor

def concurrent_execute(servers_file: str, command: str, max_workers: int = 5):
    servers = load_servers(servers_file)
    results = {}
    
    def process_server(server):
        hostname = server.get('hostname', 'unknown')
        output = execute_remote_command(server, command)
        if output is not None:
            results[hostname] = output
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_server, server) for server in servers]
        for future in futures:
            future.result()  # 等待所有任务完成
    
    # 打印结果...(同上)

3.2 交互式命令执行

对于需要交互的命令(如sudo提权),需通过invoke_shell()实现:

def interactive_command(host_info: Dict, commands: List[str]):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        ssh.connect(**{k:v for k,v in host_info.items() if k != 'hostname'})
        
        # 创建交互式shell
        shell = ssh.invoke_shell()
        
        for cmd in commands:
            shell.send(cmd + '\n')
            time.sleep(0.5)  # 等待命令执行
        
        # 获取所有输出
        output = ""
        while shell.recv_ready():
            output += shell.recv(65535).decode()
            
        print(output)
        
    finally:
        ssh.close()

3.3 命令执行超时控制

通过select模块实现非阻塞读取和超时控制:

import select

def execute_with_timeout(ssh, command: str, timeout: int = 10) -> Optional[str]:
    try:
        stdin, stdout, stderr = ssh.exec_command(command)
        
        # 设置通道超时
        transport = ssh.get_transport()
        if transport:
            transport.set_keepalive(30)
        
        # 读取输出(带超时)
        max_timeout = timeout
        start_time = time.time()
        output = ""
        
        while time.time() - start_time = max_timeout:
            print("命令执行超时")
            return None
            
        return output
        
    except Exception as e:
        print(f"执行异常: {str(e)}")
        return None

四、完整案例:批量部署应用

以下是一个完整的批量部署案例,包含文件传输和命令执行:

import os
import time
from typing import List, Dict

class BatchDeployer:
    def __init__(self, servers_file: str):
        self.servers = self._load_servers(servers_file)
    
    def _load_servers(self, file_path: str) -> List[Dict]:
        with open(file_path, 'r') as f:
            return json.load(f)
    
    def _get_ssh_client(self, host_info: Dict) -> paramiko.SSHClient:
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        if 'key_path' in host_info:
            key = paramiko.RSAKey.from_private_key_file(host_info['key_path'])
            client.connect(
                hostname=host_info['ip'],
                port=host_info.get('port', 22),
                username=host_info['username'],
                pkey=key
            )
        else:
            client.connect(
                hostname=host_info['ip'],
                port=host_info.get('port', 22),
                username=host_info['username'],
                password=host_info['password']
            )
        
        return client
    
    def upload_file(self, host_info: Dict, local_path: str, remote_path: str) -> bool:
        try:
            ssh = self._get_ssh_client(host_info)
            sftp = ssh.open_sftp()
            
            # 确保远程目录存在
            remote_dir = os.path.dirname(remote_path)
            try:
                sftp.chdir(remote_dir)
            except IOError:
                sftp.mkdir(remote_dir)
                sftp.chdir(remote_dir)
            
            sftp.put(local_path, remote_path)
            sftp.close()
            ssh.close()
            return True
        except Exception as e:
            print(f"文件传输失败: {str(e)}")
            return False
    
    def execute_commands(self, host_info: Dict, commands: List[str]) -> Optional[str]:
        ssh = self._get_ssh_client(host_info)
        output = ""
        
        try:
            for cmd in commands:
                stdin, stdout, stderr = ssh.exec_command(cmd)
                output += f"\n执行: {cmd}\n"
                output += stdout.read().decode()
                error = stderr.read().decode()
                if error:
                    output += f"\n错误: {error}"
                    
        finally:
            ssh.close()
            
        return output if output.strip() else None
    
    def deploy_app(self, app_name: str, file_path: str, commands: List[str]):
        results = {}
        
        for server in self.servers:
            hostname = server.get('hostname', 'unknown')
            print(f"\n=== 开始部署 {hostname} ===")
            
            # 1. 上传文件
            remote_path = f"/tmp/{app_name}.tar.gz"
            if not self.upload_file(server, file_path, remote_path):
                results[hostname] = "文件上传失败"
                continue
            
            # 2. 执行部署命令
            cmd_output = self.execute_commands(server, commands)
            results[hostname] = cmd_output or "部署完成但无输出"
            
        # 打印结果
        for hostname, result in results.items():
            print(f"\n{hostname} 部署结果:")
            print(result)

# 使用示例
if __name__ == "__main__":
    deployer = BatchDeployer('servers.json')
    
    # 部署参数
    app_name = "myapp"
    local_file = "./myapp.tar.gz"
    deploy_commands = [
        f"tar -xzf /tmp/{app_name}.tar.gz -C /opt/",
        "systemctl restart myapp",
        "systemctl status myapp"
    ]
    
    deployer.deploy_app(app_name, local_file, deploy_commands)

五、最佳实践与注意事项

5.1 安全建议

1. 禁用AutoAddPolicy,使用known_hosts文件验证主机密钥

2. 密钥文件权限设置为600

3. 避免在代码中硬编码密码,使用环境变量或配置文件

5.2 性能优化

1. 保持SSH连接复用(使用SSHClient实例缓存)

2. 合理设置并发线程数(通常为CPU核心数*2)

3. 对大文件传输使用分块读取

5.3 错误处理

1. 捕获paramiko.SSHException及其子类

2. 处理网络超时和连接中断

3. 记录详细的错误日志

六、总结

本文详细介绍了paramiko模块在批量SSH命令执行中的应用,从基础连接建立到高级并发控制,涵盖了生产环境中的常见需求。通过合理设计,paramiko可以成为强大的自动化运维工具,显著提升管理效率。

关键词:Python、paramiko、SSH、批量执行、自动化运维、密钥认证、多线程、文件传输

简介:本文系统讲解了使用Python paramiko模块实现批量SSH命令执行的方法,包含基础连接、密钥认证、多线程并发、交互式命令执行等高级功能,并提供完整的部署案例和最佳实践建议。

《详解python通过paramiko模块批量执行ssh命令.doc》
将本文以doc文档格式下载到电脑,方便收藏和打印
推荐度:
点击下载文档