#!/usr/bin/python3 # -*- coding: utf-8 -*- # ----------------------------------------------------------------------- # This file is part of TISBackup # # TISBackup is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # TISBackup is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with TISBackup. If not, see . # # ----------------------------------------------------------------------- """SSH operations and key management utilities.""" import sys try: sys.stderr = open("/dev/null") # Silence silly warnings from paramiko import paramiko except ImportError as e: print(("Error : can not load paramiko library %s" % e)) raise sys.stderr = sys.__stderr__ def load_ssh_private_key(private_key_path): """Load SSH private key with modern algorithm support. Tries to load the key in order of preference: 1. Ed25519 (most secure, modern) 2. ECDSA (secure, widely supported) 3. RSA (legacy, still secure with sufficient key size) DSA is not supported as it's deprecated and insecure. Args: private_key_path: Path to the private key file Returns: paramiko key object Raises: paramiko.SSHException: If key cannot be loaded """ key_types = [ ("Ed25519", paramiko.Ed25519Key), ("ECDSA", paramiko.ECDSAKey), ("RSA", paramiko.RSAKey), ] last_exception = None for key_name, key_class in key_types: try: return key_class.from_private_key_file(private_key_path) except paramiko.SSHException as e: last_exception = e continue # If we get here, none of the key types worked raise paramiko.SSHException( f"Unable to load private key from {private_key_path}. " f"Supported formats: Ed25519 (recommended), ECDSA, RSA. " f"DSA keys are no longer supported. " f"Last error: {last_exception}" ) def ssh_exec(command, ssh=None, server_name="", remote_user="", private_key="", ssh_port=22): """execute command on server_name using the provided ssh connection or creates a new connection if ssh is not provided. returns (exit_code,output) output is the concatenation of stdout and stderr """ if not ssh: assert server_name and remote_user and private_key mykey = load_ssh_private_key(private_key) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(server_name, username=remote_user, pkey=mykey, port=ssh_port) tran = ssh.get_transport() chan = tran.open_session() # chan.set_combine_stderr(True) chan.get_pty() stdout = chan.makefile() chan.exec_command(command) stdout.flush() output_base = stdout.read() output = output_base.decode(errors="ignore").replace("'", "") exit_code = chan.recv_exit_status() return (exit_code, output)