Tools: Essential Guide: Python SSH Automation: Manage 100 Servers From One Script
Python SSH Automation: Manage 100 Servers From One Script
The Problem with Manual SSH
Paramiko: Python's SSH Library
Parallel Execution Across Multiple Servers
File Deployment via SFTP
Get the Full Toolkit Managing infrastructure manually doesn't scale. Here's how to automate SSH operations across fleets of servers with Python. Imagine patching 50 servers one-by-one. That's 50 SSH sessions, 50 copy-paste operations, 50 chances for human error. Want 47 production-ready Python automation scripts including complete server management frameworks? 👉 Python Automation Toolkit What are you automating on your servers? Share in the comments! Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse
$ import paramiko
import logging logging.basicConfig(level=logging.INFO) class SSHClient: def __init__(self, hostname: str, username: str, key_path: str): self.hostname = hostname self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect( hostname, username=username, key_filename=key_path, timeout=10 ) def run(self, command: str) -> tuple: stdin, stdout, stderr = self.client.exec_command(command) exit_code = stdout.channel.recv_exit_status() return stdout.read().decode(), stderr.read().decode(), exit_code def close(self): self.client.close() # Usage
with SSHClient('server1.example.com', 'ubuntu', '~/.ssh/id_rsa') as ssh: output, error, code = ssh.run('df -h') print(output)
import paramiko
import logging logging.basicConfig(level=logging.INFO) class SSHClient: def __init__(self, hostname: str, username: str, key_path: str): self.hostname = hostname self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect( hostname, username=username, key_filename=key_path, timeout=10 ) def run(self, command: str) -> tuple: stdin, stdout, stderr = self.client.exec_command(command) exit_code = stdout.channel.recv_exit_status() return stdout.read().decode(), stderr.read().decode(), exit_code def close(self): self.client.close() # Usage
with SSHClient('server1.example.com', 'ubuntu', '~/.ssh/id_rsa') as ssh: output, error, code = ssh.run('df -h') print(output)
import paramiko
import logging logging.basicConfig(level=logging.INFO) class SSHClient: def __init__(self, hostname: str, username: str, key_path: str): self.hostname = hostname self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect( hostname, username=username, key_filename=key_path, timeout=10 ) def run(self, command: str) -> tuple: stdin, stdout, stderr = self.client.exec_command(command) exit_code = stdout.channel.recv_exit_status() return stdout.read().decode(), stderr.read().decode(), exit_code def close(self): self.client.close() # Usage
with SSHClient('server1.example.com', 'ubuntu', '~/.ssh/id_rsa') as ssh: output, error, code = ssh.run('df -h') print(output)
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict def run_on_servers( servers: List[str], command: str, username: str, key_path: str, max_workers: int = 10
) -> Dict[str, dict]: results = {} def execute_on_server(hostname): try: client = SSHClient(hostname, username, key_path) stdout, stderr, code = client.run(command) client.close() return { 'success': code == 0, 'output': stdout, 'error': stderr, 'exit_code': code } except Exception as e: return {'success': False, 'error': str(e), 'exit_code': -1} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_server = { executor.submit(execute_on_server, srv): srv for srv in servers } for future in as_completed(future_to_server): server = future_to_server[future] results[server] = future.result() return results # Patch 50 servers simultaneously
servers = [f'server{i}.example.com' for i in range(1, 51)]
results = run_on_servers(servers, '-weight: 600;">sudo -weight: 500;">apt-get -weight: 500;">update -q', 'ubuntu', '~/.ssh/id_rsa') successful = sum(1 for r in results.values() if r['success'])
print(f"Updated {successful}/{len(servers)} servers successfully")
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict def run_on_servers( servers: List[str], command: str, username: str, key_path: str, max_workers: int = 10
) -> Dict[str, dict]: results = {} def execute_on_server(hostname): try: client = SSHClient(hostname, username, key_path) stdout, stderr, code = client.run(command) client.close() return { 'success': code == 0, 'output': stdout, 'error': stderr, 'exit_code': code } except Exception as e: return {'success': False, 'error': str(e), 'exit_code': -1} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_server = { executor.submit(execute_on_server, srv): srv for srv in servers } for future in as_completed(future_to_server): server = future_to_server[future] results[server] = future.result() return results # Patch 50 servers simultaneously
servers = [f'server{i}.example.com' for i in range(1, 51)]
results = run_on_servers(servers, '-weight: 600;">sudo -weight: 500;">apt-get -weight: 500;">update -q', 'ubuntu', '~/.ssh/id_rsa') successful = sum(1 for r in results.values() if r['success'])
print(f"Updated {successful}/{len(servers)} servers successfully")
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict def run_on_servers( servers: List[str], command: str, username: str, key_path: str, max_workers: int = 10
) -> Dict[str, dict]: results = {} def execute_on_server(hostname): try: client = SSHClient(hostname, username, key_path) stdout, stderr, code = client.run(command) client.close() return { 'success': code == 0, 'output': stdout, 'error': stderr, 'exit_code': code } except Exception as e: return {'success': False, 'error': str(e), 'exit_code': -1} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_server = { executor.submit(execute_on_server, srv): srv for srv in servers } for future in as_completed(future_to_server): server = future_to_server[future] results[server] = future.result() return results # Patch 50 servers simultaneously
servers = [f'server{i}.example.com' for i in range(1, 51)]
results = run_on_servers(servers, '-weight: 600;">sudo -weight: 500;">apt-get -weight: 500;">update -q', 'ubuntu', '~/.ssh/id_rsa') successful = sum(1 for r in results.values() if r['success'])
print(f"Updated {successful}/{len(servers)} servers successfully")
import paramiko
from pathlib import Path def deploy_file(hostname: str, username: str, key_path: str, local_path: str, remote_path: str): transport = paramiko.Transport((hostname, 22)) transport.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(key_path)) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(local_path, remote_path) sftp.close() transport.close() print(f"Deployed {local_path} -> {hostname}:{remote_path}")
import paramiko
from pathlib import Path def deploy_file(hostname: str, username: str, key_path: str, local_path: str, remote_path: str): transport = paramiko.Transport((hostname, 22)) transport.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(key_path)) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(local_path, remote_path) sftp.close() transport.close() print(f"Deployed {local_path} -> {hostname}:{remote_path}")
import paramiko
from pathlib import Path def deploy_file(hostname: str, username: str, key_path: str, local_path: str, remote_path: str): transport = paramiko.Transport((hostname, 22)) transport.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(key_path)) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(local_path, remote_path) sftp.close() transport.close() print(f"Deployed {local_path} -> {hostname}:{remote_path}")