我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用pexpect.TIMEOUT。
def ssh_session(ip, versions, latest=None): s = pxssh.pxssh(encoding='utf-8') s.login(ip, 'ec2-user', ssh_key=ssh_key) try: print_remote_step('Docker login') s.sendline("docker login -u '%s' -p '%s'"%(user, passwd)) s.expect_exact('Email:', timeout=120) s.sendline(email) s.expect_exact('Login Succeeded', timeout=120) log(s.before) l = "python emccbuild.py -l build -v %s -p" % (' '.join(versions)) if latest is not None: l += " -t %s"%(latest) print_remote_step(l) s.sendline(l) s.prompt(timeout=1200) log(s.before) s.logout() except pexpect.TIMEOUT : print_remote_step('Expect Timeout reached, going interactive') s.interact()
def prompt(self, timeout=-1): '''Match the next shell prompt. This is little more than a short-cut to the :meth:`~pexpect.spawn.expect` method. Note that if you called :meth:`login` with ``auto_prompt_reset=False``, then before calling :meth:`prompt` you must set the :attr:`PROMPT` attribute to a regex that it will use for matching the prompt. Calling :meth:`prompt` will erase the contents of the :attr:`before` attribute even if no prompt is ever matched. If timeout is not given or it is set to -1 then self.timeout is used. :return: True if the shell prompt was matched, False if the timeout was reached. ''' if timeout == -1: timeout = self.timeout i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) if i==1: return False return True
def collect_log(self, session): """ Collects the response from a connected session till the tread is stopped This function currently collects response from a connected pexpect spawn object using the pexpect read_nonblocking method """ response = " " while not self.stop_thread_flag: try: # default timeout for pexpect-spawn object is 30s string = session.read_nonblocking(1024, timeout=30) response = response + string time.sleep(0.5) self.data = response # continue reading data from 'session' until the thread is stopped except pexpect.TIMEOUT: continue except Exception as exception: print_exception(exception) break
def __step_initialisation(self): """ We expect that the profile name specified for iked was successfully loaded :return: """ i = self._child.expect([pexpect.TIMEOUT, CONFIG_LOADED, FAIL_TO_LOAD]) if i == 0: fatal("Timeout while executing ikec") self.disconnect() if i == 1: self.logger.info("Config loaded") self.__step_send_connect() if i == 2: fatal("Fail to load site configuration for %s" % self.profile_name) self.disconnect()
def __step_send_connect(self): """ Config loaded, let's create the tunnel :return: """ self.logger.info("Sending C command to connect") self._child.sendline('c') i = self._child.expect([pexpect.TIMEOUT, TUNNEL_ENABLED, DETACHED]) if i == 0: self.logger.error("Ikec timeout. Cannot establish tunnel. Retrying") self.__retry_with_sleep(RETRY_SLEEP_DURATION) if i == 1: self.logger.info("Tunnel established") self.__monitor_loop() if i == 2: self.logger.info("Detached from key daemon") self.__retry_with_sleep(RETRY_SLEEP_DURATION)
def __monitor_loop(self): """ Continuously looping every 30 seconds if some new data has come :return: """ self.logger.info('Monitoring changes of the tunnel') self.set_state(APP_STATES.STARTED | APP_STATES.CONNECTED) try: while True: if self._monitor_timer == None and PING_ENABLED: self.__create_monitor_thread() i = self._child.expect([pexpect.TIMEOUT, DETACHED], timeout=10) if i == 0: continue if i == 1: self.set_state(APP_STATES.STARTED | APP_STATES.CONNECTING) self.logger.info("Tunnel has been closed. Retrying to establish the connection") self.__retry_with_sleep(RETRY_SLEEP_DURATION) except Exception as e: if not self._state & APP_STATES.STOPPING: self.logger.error("Quiting monitoring loop due to exception %s" % e) self.disconnect()
def expect_setup_callback(e, callback): """Setup a callback that is called once a second while waiting for patterns.""" import pexpect def _expect_callback(pattern, timeout=e.timeout): tstart = time.time() while time.time() < tstart + timeout: try: ret = e.expect_saved(pattern, timeout=1) return ret except pexpect.TIMEOUT: e.expect_user_callback(e) pass print("Timed out looking for %s" % pattern) raise pexpect.TIMEOUT(timeout) e.expect_user_callback = callback e.expect_saved = e.expect e.expect = _expect_callback
def expect_exact(self, pattern_list, timeout = -1, searchwindowsize = -1): """This is similar to expect(), but uses plain string matching instead of compiled regular expressions in 'pattern_list'. The 'pattern_list' may be a string; a list or other sequence of strings; or TIMEOUT and EOF. This call might be faster than expect() for two reasons: string searching is faster than RE matching and it is possible to limit the search to just the end of the input buffer. This method is also useful when you don't want to have to worry about escaping regular expression characters that you want to match.""" if type(pattern_list) in types.StringTypes or pattern_list in (TIMEOUT, EOF): pattern_list = [pattern_list] return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
def __init__(self, patterns): """This creates an instance that searches for 'patterns' Where 'patterns' may be a list or other sequence of compiled regular expressions, or the EOF or TIMEOUT types.""" self.eof_index = -1 self.timeout_index = -1 self._searches = [] for n, s in zip(range(len(patterns)), patterns): if s is EOF: self.eof_index = n continue if s is TIMEOUT: self.timeout_index = n continue self._searches.append((n, s))
def query(self, cmd): if not self.con.isalive(): raise errors.DeadConsoleError self.con.sendline(cmd) try: p_re = [self.ps1_re] if self.exit_re: p_re.insert(0, self.exit_re) if self.prompt_re: p_re.insert(0, self.prompt_re) pattern_index = self.con.expect(p_re) if pattern_index == 0: return self.con.before elif pattern_index == 1: self.close() return '^exit' elif pattern_index == 2: self.con.close() return '^kill' except (pexpect.TIMEOUT, pexpect.EOF): ## Connection's probably dead, close the socket self.close() raise errors.ConsoleSessionError raise errors.UnexpectedResponseError
def main (): while True: ps = pexpect.spawn ('ps') time.sleep (1) index = ps.expect (['/usr/bin/ssh', pexpect.EOF, pexpect.TIMEOUT]) if index == 2: print('TIMEOUT in ps command...') print(str(ps)) time.sleep (13) if index == 1: print(time.asctime(), end=' ') print('restarting tunnel') start_tunnel () time.sleep (11) print('tunnel OK') else: # print 'tunnel OK' time.sleep (7)
def run(self): items = [ (event["pattern"], event) for event in self._event_vector.values() ] patterns = [item[0] for item in items] events = [item[1] for item in items] log.info('Running...') while self._parent_aliveness.is_set(): try: event_index = self._connection.expect(patterns, timeout=.5) except pexpect.TIMEOUT: continue except (NotConnectedError, pexpect.EOF): self._event_vector["disconnected"]["event"].set() break event = events[event_index] event["before"] = self._connection.before event["after"] = self._connection.after event["match"] = self._connection.match event["event"].set() if event["callback"]: event["callback"](event) log.info("Listener thread finished")
def clear_bond(self, address=None): """Use the 'bluetoothctl' program to erase a stored BLE bond. """ con = pexpect.spawn('sudo bluetoothctl') con.expect("bluetooth", timeout=1) log.info("Clearing bond for %s", address) con.sendline("remove " + address.upper()) try: con.expect( ["Device has been removed", "# "], timeout=.5 ) except pexpect.TIMEOUT: log.error("Unable to remove bonds for %s: %s", address, con.before) log.info("Removed bonds for %s", address)
def send_command(self, cmd, password=False): """Send command.""" try: if password: timeout = 10 self._connection.log("Waiting for ECHO OFF") if self.waitnoecho(timeout): # pylint: disable=no-member self._connection.log("Password ECHO OFF received") else: self._connection.log("Password ECHO OFF not received within {}s".format(timeout)) self.sendline(cmd) # pylint: disable=no-member else: self.send(cmd) # pylint: disable=no-member self.expect_exact([cmd, pexpect.TIMEOUT], timeout=15) # pylint: disable=no-member self.sendline() # pylint: disable=no-member except OSError: self._connection.log("Session already disconnected.") raise ConnectionError("Session already disconnected")
def enable(self, enable_password): """Change to the privilege mode.""" if self.device.prompt[-1] == '#': self.log("Device is already in privileged mode") return events = [self.password_re, self.device.prompt_re, pexpect.TIMEOUT, pexpect.EOF] transitions = [ (self.password_re, [0], 1, partial(a_send_password, enable_password), 10), (self.password_re, [1], -1, ConnectionAuthenticationError("Incorrect enable password", self.device.hostname), 0), (self.device.prompt_re, [0, 1, 2, 3], -1, a_expected_prompt, 0), (pexpect.TIMEOUT, [0, 1, 2], -1, ConnectionAuthenticationError("Unable to get privileged mode", self.device.hostname), 0), (pexpect.EOF, [0, 1, 2], -1, ConnectionError("Device disconnected"), 0) ] self.device.ctrl.send_command(self.enable_cmd) fsm = FSM("IOS-ENABLE", self.device, events, transitions, timeout=10, max_transitions=5) fsm.run() if self.device.prompt[-1] != '#': raise ConnectionAuthenticationError("Privileged mode not set", self.device.hostname)
def reload(self, reload_timeout=300, save_config=True): """Reload the device. CSM_DUT#reload System configuration has been modified. Save? [yes/no]: yes Building configuration... [OK] Proceed with reload? [confirm] """ response = "yes" if save_config else "no" events = [SAVE_CONFIG, PROCEED, pexpect.TIMEOUT, pexpect.EOF] transitions = [ (SAVE_CONFIG, [0], 1, partial(a_send_line, response), 60), (PROCEED, [0, 1], 2, partial(a_send, "\r"), reload_timeout), # if timeout try to send the reload command again (pexpect.TIMEOUT, [0], 0, partial(a_send_line, self.reload_cmd), 10), (pexpect.TIMEOUT, [2], -1, a_reconnect, 0), (pexpect.EOF, [0, 1, 2], -1, a_disconnect, 0) ] fsm = FSM("IOS-RELOAD", self.device, events, transitions, timeout=10, max_transitions=5) return fsm.run()
def authenticate(self, driver): """Authenticate using the SSH protocol specific FSM.""" # 0 1 2 3 events = [driver.press_return_re, driver.password_re, self.device.prompt_re, pexpect.TIMEOUT] transitions = [ (driver.press_return_re, [0, 1], 1, partial(a_send, "\r\n"), 10), (driver.password_re, [0], 1, partial(a_send_password, self._acquire_password()), _C['first_prompt_timeout']), (driver.password_re, [1], -1, a_authentication_error, 0), (self.device.prompt_re, [0, 1], -1, None, 0), (pexpect.TIMEOUT, [1], -1, ConnectionError("Error getting device prompt") if self.device.is_target else partial(a_send, "\r\n"), 0) ] self.log("EXPECTED_PROMPT={}".format(pattern_to_str(self.device.prompt_re))) fsm = FSM("SSH-AUTH", self.device, events, transitions, init_pattern=self.last_pattern, timeout=30) return fsm.run()
def restart(self): """ Restart ``ovs-vswitchd`` instance. ``ovsdb-server`` is not restarted. :raises: pexpect.EOF, pexpect.TIMEOUT """ self._logger.info("Restarting vswitchd...") if os.path.isfile(self._vswitchd_pidfile_path): self._logger.info('Killing ovs-vswitchd...') with open(self._vswitchd_pidfile_path, "r") as pidfile: vswitchd_pid = pidfile.read().strip() tasks.terminate_task(vswitchd_pid, logger=self._logger) try: tasks.Process.start(self) self.relinquish() except (pexpect.EOF, pexpect.TIMEOUT) as exc: logging.error("Exception during VSwitch start.") self._kill_ovsdb() raise exc self._logger.info("Vswitchd...Started.")
def __init__(self, strings): '''This creates an instance of searcher_string. This argument 'strings' may be a list; a sequence of strings; or the EOF or TIMEOUT types. ''' self.eof_index = -1 self.timeout_index = -1 self._strings = [] for n, s in enumerate(strings): if s is EOF: self.eof_index = n continue if s is TIMEOUT: self.timeout_index = n continue self._strings.append((n, s))
def __str__(self): '''This returns a human-readable string that represents the state of the object.''' #ss = [(n, ' %d: re.compile("%s")' % # (n, repr(s.pattern))) for n, s in self._searches] ss = list() for n, s in self._searches: try: ss.append((n, ' %d: re.compile("%s")' % (n, s.pattern))) except UnicodeEncodeError: # for test cases that display __str__ of searches, dont throw # another exception just because stdout is ascii-only, using # repr() ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern))) ss.append((-1, 'searcher_re:')) if self.eof_index >= 0: ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) if self.timeout_index >= 0: ss.append((self.timeout_index, ' %d: TIMEOUT' % self.timeout_index)) ss.sort() ss = list(zip(*ss))[1] return '\n'.join(ss)
def __init__(self, patterns): '''This creates an instance that searches for 'patterns' Where 'patterns' may be a list or other sequence of compiled regular expressions, or the EOF or TIMEOUT types.''' self.eof_index = -1 self.timeout_index = -1 self._searches = [] for n, s in zip(list(range(len(patterns))), patterns): if s is EOF: self.eof_index = n continue if s is TIMEOUT: self.timeout_index = n continue self._searches.append((n, s))
def poll(self, timeout=None): if self.pending_data: return True if self.done: raise EOFError('Child process is done') if timeout is None: timeout = 0 while True: try: self.child.expect(r'.', timeout=timeout) except pexpect.TIMEOUT: return True if self.pending_data else False except pexpect.EOF: self.pending_data += self.child.before self.done = True if self.pending_data: return True raise EOFError('Child process is done') else: self.pending_data += self.child.before + self.child.after
def _prepare_smartcard(self, name, filename): import pexpect remover = pexpect.spawn('ssh-add -e "{}"'.format(filename)) i = remover.expect(["Card removed:", "Could not remove card", pexpect.TIMEOUT]) if i == 2: raise RuntimeError("Unable to reset card using ssh-agent. Output of ssh-agent was: \n{}\n\n" "Please report this error!".format(remover.before)) adder = pexpect.spawn('ssh-add -s "{}" -t 14400'.format(filename)) i = adder.expect(['Enter passphrase for PKCS#11:', pexpect.TIMEOUT]) if i == 0: adder.sendline(getpass.getpass('Please enter your passcode to unlock your "{}" smartcard: '.format(name))) else: raise RuntimeError("Unable to add card using ssh-agent. Output of ssh-agent was: \n{}\n\n" "Please report this error!".format(remover.before)) i = adder.expect(['Card added:', pexpect.TIMEOUT]) if i != 0: raise RuntimeError("Unexpected error while adding card. Check your passcode and try again.") return True
def testSpecificVLAN( self ): "Test connectivity between hosts on a specific VLAN" vlan = 1001 p = pexpect.spawn( 'python -m mininet.examples.vlanhost %d' % vlan ) p.expect( self.prompt ) p.sendline( 'h1 ping -c 1 h2' ) p.expect ( '(\d+)% packet loss' ) percent = int( p.match.group( 1 ) ) if p.match else -1 p.expect( self.prompt ) p.sendline( 'h1 ifconfig' ) i = p.expect( ['h1-eth0.%d' % vlan, pexpect.TIMEOUT ], timeout=2 ) p.expect( self.prompt ) p.sendline( 'exit' ) p.wait() self.assertEqual( percent, 0 ) # no packet loss on ping self.assertEqual( i, 0 ) # check vlan intf is present
def setUp( self ): # verify that sshd is not running self.assertFalse( self.connected() ) # create public key pair for testing sh( 'rm -rf /tmp/ssh' ) sh( 'mkdir /tmp/ssh' ) sh( "ssh-keygen -t rsa -P '' -f /tmp/ssh/test_rsa" ) sh( 'cat /tmp/ssh/test_rsa.pub >> /tmp/ssh/authorized_keys' ) # run example with custom sshd args cmd = ( 'python -m mininet.examples.baresshd ' '-o AuthorizedKeysFile=/tmp/ssh/authorized_keys ' '-o StrictModes=no' ) p = pexpect.spawn( cmd ) runOpts = [ 'You may now ssh into h1 at 10.0.0.1', 'after 5 seconds, h1 is not listening on port 22', pexpect.EOF, pexpect.TIMEOUT ] while True: index = p.expect( runOpts ) if index == 0: break else: self.tearDown() self.fail( 'sshd failed to start in host h1' )
def coreTest( vm, prompt=Prompt ): "Run core tests (make test) in VM" log( '* Making sure cgroups are mounted' ) vm.sendline( 'sudo -n service cgroup-lite restart' ) vm.expect( prompt ) vm.sendline( 'sudo -n cgroups-mount' ) vm.expect( prompt ) log( '* Running make test' ) vm.sendline( 'cd ~/mininet; sudo make test' ) # We should change "make test" to report the number of # successful and failed tests. For now, we have to # know the time for each test, which means that this # script will have to change as we add more tests. for test in range( 0, 2 ): if vm.expect( [ 'OK.*\r\n', 'FAILED.*\r\n', pexpect.TIMEOUT ], timeout=180 ) == 0: log( '* Test', test, 'OK' ) else: log( '* Test', test, 'FAILED' ) log( '* Test', test, 'output:' ) log( vm.before )
def notification_loop( self ): while True: try: pnum = self.con.expect('Notification handle = .*? \r', timeout=4) except pexpect.TIMEOUT: print "TIMEOUT exception!" #was: print "TIMEOUT exception!" break if pnum==0: after = self.con.after hxstr = after.split()[3:] print "****" handle = long(float.fromhex(hxstr[0])) #try: if True: self.cb[handle]([long(float.fromhex(n)) for n in hxstr[2:]]) #except: # print "Error in callback for %x" % handle # print sys.argv[1] pass else: print "TIMEOUT!!" pass
def ssh_remote_host(host, user, password): ssh = 'ssh ' + user + '@' + host ssh = pexpect.spawn("/bin/bash", ['-c', ssh]) i = ssh.expect(['[Pp]assword:', SSH_NEWKEY, pexpect.EOF, pexpect.TIMEOUT]) if i == 0: ssh.sendline(password) j = ssh.expect(".*[$#]") ssh.send('\n') ssh.interact() elif i == 1: ssh.sendline('yes') ssh.expect('[Pp]assword:') ssh.sendline(password) ssh.expect(".*[$#]") ssh.send('\n') ssh.interact() elif i == 2: pass elif i == 3: pass ssh.close() return
def test_searcher_re (self): # This should be done programatically, if we copied and pasted output, # there wouldnt be a whole lot to test, really, other than our ability # to copy and paste correctly :-) ss = pexpect.searcher_re ([ re.compile('this'), re.compile('that'), re.compile('and'), re.compile('the'), re.compile('other') ]) out = ('searcher_re:\n 0: re.compile("this")\n ' '1: re.compile("that")\n 2: re.compile("and")\n ' '3: re.compile("the")\n 4: re.compile("other")') assert ss.__str__() == out, (ss.__str__(), out) ss = pexpect.searcher_re ([ pexpect.TIMEOUT, re.compile('this'), re.compile('that'), re.compile('and'), pexpect.EOF,re.compile('other') ]) out = ('searcher_re:\n 0: TIMEOUT\n 1: re.compile("this")\n ' '2: re.compile("that")\n 3: re.compile("and")\n ' '4: EOF\n 5: re.compile("other")') assert ss.__str__() == out, (ss.__str__(), out)
def startPool(self): """Starts the Device pool""" if not self.needsPool(): return self.pool_process = pexpect.spawn(self.pool_exec, args=[self.pool_ds_instance]) # make sure the device pool sends events self.pool_process.delayafterterminate = 2 idx = self.pool_process.expect([PoolTestCase.PoolReadyMsg, PoolTestCase.PoolAlreadyRunning, pexpect.EOF, pexpect.TIMEOUT], timeout=PoolTestCase.PoolMaxStartupTime) if idx == 0: return elif idx == 1: self.assert_(False, PoolTestCase.PoolAlreadyRunning) elif idx == 2: self.assert_(False, "Device Pool terminated unexpectedly") elif idx == 3: self.assert_(False, "Device Pool startup time exceeded")
def startMotorSimulator(self): """Starts the Motor Simulator""" if not self.needsMotorSimulator(): return self.motsim_process = pexpect.spawn("python", args=[self.motsim_exec, self.motsim_ds_instance]) idx = self.motsim_process.expect([PoolTestCase.MotorSimulatorReadyMsg, PoolTestCase.MotorSimulatorAlreadyRunning, pexpect.EOF, pexpect.TIMEOUT], timeout=PoolTestCase.MotorSimulatorMaxStartupTime) if idx == 0: return elif idx == 1: self.assert_(False, PoolTestCase.MotorSimulatorAlreadyRunning) elif idx == 2: self.assert_(False, "Motor Simulator terminated unexpectedly") elif idx == 3: self.assert_(False, "Motor Simulator startup time exceeded")
def expect_exact(self, pattern_list, timeout=-1, searchwindowsize=-1): """This is similar to expect(), but uses plain string matching instead of compiled regular expressions in 'pattern_list'. The 'pattern_list' may be a string; a list or other sequence of strings; or TIMEOUT and EOF. This call might be faster than expect() for two reasons: string searching is faster than RE matching and it is possible to limit the search to just the end of the input buffer. This method is also useful when you don't want to have to worry about escaping regular expression characters that you want to match.""" if type(pattern_list) in types.StringTypes or pattern_list in (TIMEOUT, EOF): pattern_list = [pattern_list] return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
def expect_exact_better(proc, expected): """Uses raw strings like expect_exact, but starts looking from the start. """ # I'd put a $ on the end of this regex, but sometimes the buffer comes # to us too quickly for our assertions before = proc.before after = proc.after reg = '^' + re.escape(expected) reg = reg.replace('\n', '\r*\n') try: proc.expect(reg) except pexpect.TIMEOUT: # pragma: no cover message = ( 'Incorrect output.', '>>> Context:', before.decode('utf8') + after.decode('utf8'), '>>> Expected:', ' ' + expected.replace('\r', '').replace('\n', '\n '), '>>> Actual:', ' ' + proc.buffer.replace(b'\r', b'').replace(b'\n', b'\n ').decode('utf8'), ) message = '\n'.join(message) if sys.version_info < (3, 0): message = message.encode('utf8') raise AssertionError(message)
def gather_input(child, seconds): try: # This regexp will never match child.expect("(?!)", seconds) except pexpect.TIMEOUT: pass ############################################################################# # A NetBSD version. # # Subclasses should define: # # dist_url(self) # the top-level URL for the machine-dependent download tree where # the version can be downloaded, for example, # ftp://ftp.netbsd.org/pub/NetBSD/NetBSD-5.0.2/i386/ # # mi_url(self) # The top-level URL for the machine-independent download tree, # for example, ftp://ftp.netbsd.org/pub/NetBSD/NetBSD-5.0.2/ # # default_workdir(self) # a file name component identifying the version, for use in # constructing a unique, version-specific working directory # # arch(self) # the name of the machine architecture the version is for, # e.g., i386