我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用paramiko.SSHConfig()。
def secure_copy(user, host, src, dest, key_filename=None, allow_agent=True): keys = _load_keys(key_filename, allow_agent) pkey = keys[0] ssh = paramiko.SSHClient() proxy = None ssh_config_file = os.path.expanduser("~/.ssh/config") if os.path.exists(ssh_config_file): conf = paramiko.SSHConfig() with open(ssh_config_file) as f: conf.parse(f) host_config = conf.lookup(host) if 'proxycommand' in host_config: proxy = paramiko.ProxyCommand(host_config['proxycommand']) ssh.load_system_host_keys() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host, username=user, pkey=pkey, sock=proxy) scp = SCPClient(ssh.get_transport()) scp.get(src, dest) scp.close()
def get_a_ssh_config(box_name): """Gives back a map of all the machine's ssh configurations""" output = subprocess.check_output(["vagrant", "ssh-config", box_name]) config = SSHConfig() config.parse(StringIO(output)) host_config = config.lookup(box_name) # man 5 ssh_config: # > It is possible to have multiple identity files ... # > all these identities will be tried in sequence. for id in host_config['identityfile']: if os.path.isfile(id): host_config['identityfile'] = id return dict((v, host_config[k]) for k, v in _ssh_to_ansible) # List out servers that vagrant has running # ------------------------------
def get_ssh_key_for_host(host): ssh_config = paramiko.SSHConfig() user_config_file = os.path.expanduser("~/.ssh/config") if os.path.exists(user_config_file): with open(user_config_file) as f: ssh_config.parse(f) user_config = ssh_config.lookup(host) if 'identityfile' in user_config: path = os.path.expanduser(user_config['identityfile'][0]) if not os.path.exists(path): raise Exception("Specified IdentityFile "+path + " for " + host + " in ~/.ssh/config not existing anymore.") return path
def load_from_system_ssh_config(host): import paramiko paramiko_SSHClient_proxy_config = paramiko.SSHConfig() merged_config = StringIO() for conf_file in [os.path.join(os.path.sep, 'etc', 'ssh', 'ssh_config'), os.path.join(os.getenv('HOME','/tmp'), '.ssh', 'config')]: try: config = open(conf_file) merged_config.write(config.read()) merged_config.write('\n') config.close() del config except IOError as e: pass merged_config.seek(0,0) paramiko_SSHClient_proxy_config.parse(merged_config) del merged_config return PluginSSHConfig.__rescurive_load_from_system_ssh_config(paramiko_SSHClient_proxy_config, host)
def get_config(): if not os.path.isfile(os.path.expanduser(SSH_CONF)): return {} with open(os.path.expanduser(SSH_CONF)) as f: cfg = paramiko.SSHConfig() cfg.parse(f) ret_dict = {} hostnames = cfg.get_hostnames() for hostname in hostnames: if ('?' in hostname) or ('*' in hostname): continue options = cfg.lookup(hostname) # We want the ssh config to point to the real hostname, but we dont want to # set ansible_ssh_host to the real name, but the ssh_config alias if options['hostname'] == 'localhost': options['hostname'] = hostname ret_dict[hostname] = options return ret_dict
def dummytest(): """ Code posted on the github issue regarding the SSH Banner Error on the paramiko github. https://github.com/paramiko/paramiko/issues/673 """ import os import paramiko import logging logging.basicConfig(level=logging.DEBUG) # Loading ssh configuration to get the IP and user of the desired host (here 'bastion') cfg = paramiko.SSHConfig() with open(os.path.expanduser("~/.ssh/config")) as f: cfg.parse(f) host_cfg = cfg.lookup('bastion') sock = paramiko.ProxyCommand("ssh {}@{} nc 10.8.9.160 22".format(host_cfg.get('user'), host_cfg.get('hostname'))) sock.settimeout(30) # Sock stays open until a client tries to use it (or the program exits) # Client Setup client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Connect and execute command # The following line hangs for 15 seconds (increase with banner_timeout argument) and raises an SSHError client.connect("10.8.9.160", username='root', sock=sock) (stdin, stdout, stderr) = client.exec_command("echo 'Hello World !'") for line in stdout.readlines(): print(line) client.close()
def paramiko_connect(host): client = paramiko.SSHClient() client._policy = paramiko.WarningPolicy() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_config = paramiko.SSHConfig() user_config_file = os.path.expanduser("~/.ssh/config") try: with open(user_config_file) as f: ssh_config.parse(f) except FileNotFoundError: print("{} file could not be found. Aborting.".format(user_config_file)) sys.exit(1) cfg = {'hostname': options['hostname'], 'username': options["username"]} user_config = ssh_config.lookup(cfg['hostname']) for k in ('hostname', 'username', 'port'): if k in user_config: cfg[k] = user_config[k] if 'proxycommand' in user_config: cfg['sock'] = paramiko.ProxyCommand(user_config['proxycommand']) return client.connect(**cfg)
def _get_ssh_config(config_path='~/.ssh/config'): """Extract the configuration located at ``config_path``. Returns: paramiko.SSHConfig: the configuration instance. """ ssh_config = paramiko.SSHConfig() try: with open(os.path.realpath(os.path.expanduser(config_path))) as f: ssh_config.parse(f) except IOError: pass return ssh_config
def __init__(self): super(_SSHClientBuilder, self).__init__() self._hostname = None self._port = 22 self._username = None self._password = None self._config = paramiko.SSHConfig() self._client_class = paramiko.SSHClient self._missing_host_key_policy = paramiko.AutoAddPolicy self._timeout = DEFAULT_TIMEOUT self._banner_timeout = DEFAULT_TIMEOUT self._allow_agent = None self._proxy_command = None self._sock = None
def read_ssh_config(): ssh_config_file = os.path.join( os.environ['HOME'], '.ssh', 'config') ssh_cfg = None try: with open(ssh_config_file) as ssh_cfg_fh: ssh_cfg = SSHConfig() ssh_cfg.parse(ssh_cfg_fh) except Exception as err: # pylint: disable=broad-except LOG.warning("Default SSH configuration could not be read: {}".format(err)) return ssh_cfg
def gethostconfig(self, file, host): import paramiko file = os.path.expanduser(file) if not os.path.isfile(file): return {} sshconfig = paramiko.SSHConfig() try: sshconfig.parse(open(file)) except Exception as e: raise BackendException("could not load '%s', maybe corrupt?" % (file)) return sshconfig.lookup(host)
def through(self, host, user='root'): """ Defines a proxy command to rebound on the host. This method is actually unsafe to use and will cause an SSH Banner Error. This library can't work through a proxy :param string host: Either a valid name stored in ~/.ssh/config or a valid IP address. :param string user: A valid user for the host. Default : 'root'. :return: This instance. Allows to chain methods. :rtype: CloudApp """ ssh_conf_file = os.path.expanduser("~/.ssh/config") try: ssh_conf = paramiko.SSHConfig() with open(ssh_conf_file) as f: ssh_conf.parse(f) if host in ssh_conf.get_hostnames(): host_conf = ssh_conf.lookup(host) user = host_conf.get('user', 'root') host = host_conf.get('hostname') else: print("Could not find host {} in {}. Using raw hostname.".format(host, ssh_conf_file)) except FileNotFoundError as e: print("Could not load {} : {}. Using raw hostname.".format(ssh_conf_file, e)) # "ssh -W %h:%p {}@{}" doesn't create an entry in the logs. "ssh {}@{} nc %h %p" # Actually connects to name (causing an entry in the logs) print("ssh {}@{} nc {} 22".format(user, host, self.instance.private_ip_address)) self.ssh_sock = paramiko.ProxyCommand("ssh {}@{} nc {} 22".format(user, host, self.instance.private_ip_address)) return self
def ssh_connection(self): """Reusable :obj:`paramiko.client.SSHClient`.""" if hasattr(self, "_ssh_connection"): return self._ssh_connection # TODO configurable ssh_config_file = os.path.join(os.path.expanduser("~"), ".ssh", "config") client = paramiko.SSHClient() client._policy = paramiko.WarningPolicy() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_config = paramiko.SSHConfig() if os.path.exists(ssh_config_file): with open(ssh_config_file) as f: ssh_config.parse(f) parameters = { "hostname": self.host, "username": self.username, "password": self.password, "port": self.ssh_port, } user_config = ssh_config.lookup(self.host) for k in ('hostname', 'username', 'port'): if k in user_config: parameters[k] = user_config[k] if 'proxycommand' in user_config: parameters['sock'] = paramiko.ProxyCommand(user_config['proxycommand']) # TODO configurable # if ssh_key_file: # parameters['key_filename'] = ssh_key_file if 'identityfile' in user_config: parameters['key_filename'] = user_config['identityfile'] client.connect(**parameters) self._ssh_connection = client return client
def ssh(host, forward_agent=False, sudoable=False, max_attempts=1, max_timeout=5): """Manages a SSH connection to the desired host. Will leverage your ssh config at ~/.ssh/config if available :param host: the server to connect to :type host: str :param forward_agent: forward the local agents :type forward_agent: bool :param sudoable: allow sudo commands :type sudoable: bool :param max_attempts: the maximum attempts to connect to the desired host :type max_attempts: int :param max_timeout: the maximum timeout in seconds to sleep between attempts :type max_timeout: int :returns a SSH connection to the desired host :rtype: Connection :raises MaxConnectionAttemptsError: Exceeded the maximum attempts to establish the SSH connection. """ with closing(SSHClient()) as client: client.set_missing_host_key_policy(AutoAddPolicy()) cfg = { "hostname": host, "timeout": max_timeout, } ssh_config = SSHConfig() user_config_file = os.path.expanduser("~/.ssh/config") if os.path.exists(user_config_file): with open(user_config_file) as f: ssh_config.parse(f) host_config = ssh_config.lookup(host) if "user" in host_config: cfg["username"] = host_config["user"] if "proxycommand" in host_config: cfg["sock"] = ProxyCommand(host_config["proxycommand"]) if "identityfile" in host_config: cfg['key_filename'] = host_config['identityfile'] if "port" in host_config: cfg["port"] = int(host_config["port"]) attempts = 0 while attempts < max_attempts: try: attempts += 1 client.connect(**cfg) break except socket.error: if attempts < max_attempts: time.sleep(max_timeout) else: raise MaxConnectionAttemptsError( "Exceeded max attempts to connect to host: {0}".format(max_attempts) ) yield Connection(client, forward_agent, sudoable)