Tools: Essential Guide: Python SSH Automation: Manage 100 Servers From One Script

Tools: Essential Guide: Python SSH Automation: Manage 100 Servers From One Script

Python SSH Automation: Manage 100 Servers From One Script

The Problem with Manual SSH

Paramiko: Python's SSH Library

Parallel Execution Across Multiple Servers

File Deployment via SFTP

Get the Full Toolkit Managing infrastructure manually doesn't scale. Here's how to automate SSH operations across fleets of servers with Python. Imagine patching 50 servers one-by-one. That's 50 SSH sessions, 50 copy-paste operations, 50 chances for human error. Want 47 production-ready Python automation scripts including complete server management frameworks? 👉 Python Automation Toolkit What are you automating on your servers? Share in the comments! Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse

Command

Copy

$ import paramiko import logging logging.basicConfig(level=logging.INFO) class SSHClient: def __init__(self, hostname: str, username: str, key_path: str): self.hostname = hostname self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect( hostname, username=username, key_filename=key_path, timeout=10 ) def run(self, command: str) -> tuple: stdin, stdout, stderr = self.client.exec_command(command) exit_code = stdout.channel.recv_exit_status() return stdout.read().decode(), stderr.read().decode(), exit_code def close(self): self.client.close() # Usage with SSHClient('server1.example.com', 'ubuntu', '~/.ssh/id_rsa') as ssh: output, error, code = ssh.run('df -h') print(output) import paramiko import logging logging.basicConfig(level=logging.INFO) class SSHClient: def __init__(self, hostname: str, username: str, key_path: str): self.hostname = hostname self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect( hostname, username=username, key_filename=key_path, timeout=10 ) def run(self, command: str) -> tuple: stdin, stdout, stderr = self.client.exec_command(command) exit_code = stdout.channel.recv_exit_status() return stdout.read().decode(), stderr.read().decode(), exit_code def close(self): self.client.close() # Usage with SSHClient('server1.example.com', 'ubuntu', '~/.ssh/id_rsa') as ssh: output, error, code = ssh.run('df -h') print(output) import paramiko import logging logging.basicConfig(level=logging.INFO) class SSHClient: def __init__(self, hostname: str, username: str, key_path: str): self.hostname = hostname self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect( hostname, username=username, key_filename=key_path, timeout=10 ) def run(self, command: str) -> tuple: stdin, stdout, stderr = self.client.exec_command(command) exit_code = stdout.channel.recv_exit_status() return stdout.read().decode(), stderr.read().decode(), exit_code def close(self): self.client.close() # Usage with SSHClient('server1.example.com', 'ubuntu', '~/.ssh/id_rsa') as ssh: output, error, code = ssh.run('df -h') print(output) from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict def run_on_servers( servers: List[str], command: str, username: str, key_path: str, max_workers: int = 10 ) -> Dict[str, dict]: results = {} def execute_on_server(hostname): try: client = SSHClient(hostname, username, key_path) stdout, stderr, code = client.run(command) client.close() return { 'success': code == 0, 'output': stdout, 'error': stderr, 'exit_code': code } except Exception as e: return {'success': False, 'error': str(e), 'exit_code': -1} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_server = { executor.submit(execute_on_server, srv): srv for srv in servers } for future in as_completed(future_to_server): server = future_to_server[future] results[server] = future.result() return results # Patch 50 servers simultaneously servers = [f'server{i}.example.com' for i in range(1, 51)] results = run_on_servers(servers, '-weight: 600;">sudo -weight: 500;">apt-get -weight: 500;">update -q', 'ubuntu', '~/.ssh/id_rsa') successful = sum(1 for r in results.values() if r['success']) print(f"Updated {successful}/{len(servers)} servers successfully") from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict def run_on_servers( servers: List[str], command: str, username: str, key_path: str, max_workers: int = 10 ) -> Dict[str, dict]: results = {} def execute_on_server(hostname): try: client = SSHClient(hostname, username, key_path) stdout, stderr, code = client.run(command) client.close() return { 'success': code == 0, 'output': stdout, 'error': stderr, 'exit_code': code } except Exception as e: return {'success': False, 'error': str(e), 'exit_code': -1} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_server = { executor.submit(execute_on_server, srv): srv for srv in servers } for future in as_completed(future_to_server): server = future_to_server[future] results[server] = future.result() return results # Patch 50 servers simultaneously servers = [f'server{i}.example.com' for i in range(1, 51)] results = run_on_servers(servers, '-weight: 600;">sudo -weight: 500;">apt-get -weight: 500;">update -q', 'ubuntu', '~/.ssh/id_rsa') successful = sum(1 for r in results.values() if r['success']) print(f"Updated {successful}/{len(servers)} servers successfully") from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict def run_on_servers( servers: List[str], command: str, username: str, key_path: str, max_workers: int = 10 ) -> Dict[str, dict]: results = {} def execute_on_server(hostname): try: client = SSHClient(hostname, username, key_path) stdout, stderr, code = client.run(command) client.close() return { 'success': code == 0, 'output': stdout, 'error': stderr, 'exit_code': code } except Exception as e: return {'success': False, 'error': str(e), 'exit_code': -1} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_server = { executor.submit(execute_on_server, srv): srv for srv in servers } for future in as_completed(future_to_server): server = future_to_server[future] results[server] = future.result() return results # Patch 50 servers simultaneously servers = [f'server{i}.example.com' for i in range(1, 51)] results = run_on_servers(servers, '-weight: 600;">sudo -weight: 500;">apt-get -weight: 500;">update -q', 'ubuntu', '~/.ssh/id_rsa') successful = sum(1 for r in results.values() if r['success']) print(f"Updated {successful}/{len(servers)} servers successfully") import paramiko from pathlib import Path def deploy_file(hostname: str, username: str, key_path: str, local_path: str, remote_path: str): transport = paramiko.Transport((hostname, 22)) transport.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(key_path)) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(local_path, remote_path) sftp.close() transport.close() print(f"Deployed {local_path} -> {hostname}:{remote_path}") import paramiko from pathlib import Path def deploy_file(hostname: str, username: str, key_path: str, local_path: str, remote_path: str): transport = paramiko.Transport((hostname, 22)) transport.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(key_path)) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(local_path, remote_path) sftp.close() transport.close() print(f"Deployed {local_path} -> {hostname}:{remote_path}") import paramiko from pathlib import Path def deploy_file(hostname: str, username: str, key_path: str, local_path: str, remote_path: str): transport = paramiko.Transport((hostname, 22)) transport.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(key_path)) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(local_path, remote_path) sftp.close() transport.close() print(f"Deployed {local_path} -> {hostname}:{remote_path}")