我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用pexpect.EOF。
def recv_ClientInit(self, block): # start reward proxy. self._log_info('Starting reward proxy server') self.reward_proxy = pexpect.spawnu(self.factory.reward_proxy_bin, logfile=sys.stdout, timeout=None) # wait on reward proxy to be up. self._log_info('Waiting for reward proxy server') self.reward_proxy.expect('\[RewardProxyServer\]') self.reward_proxy_thread = threading.Thread(target=lambda: self.reward_proxy.expect(pexpect.EOF)) self.reward_proxy_thread.start() self._log_info('Reward proxy server is up %s', self.reward_proxy.before) super(DualProxyServer, self).recv_ClientInit(block) self.logfile_dir = self.log_manager.logfile_dir
def test_iter (self): '''See the note in test_readlines() for an explaination as to why I allow line3 and line4 to return multiple patterns. Basically, this is done to handle a valid condition on slow systems. ''' child = pexpect.spawn('cat') child.sendline ("abc") time.sleep(0.5) child.sendline ("123") time.sleep(0.5) child.sendeof() # Don't use ''.join() because we want to test the ITERATOR. page = b'' for line in child: page += line page = page.replace(_CAT_EOF, b'') # This is just a really bad test all together, we should write our # own 'cat' utility that only writes to stdout after EOF is recv, # this must take into consideration all possible platform impl.'s # of `cat', and their related terminal and line-buffer handling assert (page == b'abc\r\nabc\r\n123\r\n123\r\n' or page == b'abc\r\n123\r\nabc\r\n123\r\n' or page == b'abc\r\n123abc\r\n\r\n123\r\n') , \ "iterator did not work. page=%r" % (page,)
def shell_cmd(child, cmd, timeout = -1): child.send("exec /bin/sh\n") child.expect("# ") prompt = gen_shell_prompt() child.send("PS1=" + quote_prompt(prompt) + "\n") prompt_re = prompt child.expect(prompt_re) child.send(cmd + "\n") # Catch EOF to log the signalstatus, to help debug qemu crashes try: child.expect(prompt_re, timeout) except pexpect.EOF: print "pexpect reported EOF - VMM exited unexpectedly" child.close() print "exitstatus", child.exitstatus print "signalstatus", child.signalstatus raise except: raise child.send("echo exit_status=$?=\n") child.expect("exit_status=(\d+)=") r = int(child.match.group(1)) child.expect(prompt_re, timeout) return r
def RunCommand(args, timeout=None, logfile=None): """Runs a given command through pexpect.run. This function acts as a wrapper over pxpect.run . You can have exception or return values based on the exitstatus of the command execution. If exitstatus is not zero, then it will return -1, unless you want RuntimeError. If there is TIMEOUT, then exception is raised. If events do not match, command's output is printed, and -1 is returned. Args: args: command with arguments as an array timeout: timeout for pexpect.run . logfile: an opened filestream to write the output Raises: RuntimeError: Command's exit status is not zero Returns: Returns -1, if bad exitstatus is not zero and when events do not match Otherwise returns 0, if everything is fine """ child = pexpect.spawn(args[0], args=args[1:], timeout=timeout, logfile=logfile) child.expect(pexpect.EOF) child.close() if child.exitstatus: print args raise RuntimeError(("Error: {}\nProblem running command. " "Exit status: {}").format(child.before, child.exitstatus)) return 0
def read(fd, **kwargs): """ Reads at most size bytes from the file (less if the read hits EOF before obtaining size bytes). :Arguments: fd - file descriptor got from open_file :Optional: size - number of bytes to be read :Return: the string read from the file, None if not able to read """ try: readsize = fd.read(**kwargs) print_info("read {} bytes from file {}".format(readsize, fd.name)) except ValueError: print_error("file is already closed...") readsize = 0 except Exception as e: print_error("found exception {} while reading {}".format(str(e), fd)) readsize = 0 return readsize
def readlines(fd, **kwargs): """ Reads until EOF using readline() and return a list containing the lines. If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read. :Arguments: fd - file descriptor got from open_file :Return: list of lines from the file """ try: lines = fd.readlines(**kwargs) print_info("read all lines from file "+fd.name) except ValueError: print_error("file is already closed...") lines = False except Exception as e: print_error("found exception {} while reading lines in {}". format(str(e), fd)) lines = False return lines
def readline (self, size = -1): # File-like object. """This reads and returns one entire line. A trailing newline is kept in the string, but may be absent when a file ends with an incomplete line. Note: This readline() looks for a \\r\\n pair even on UNIX because this is what the pseudo tty device returns. So contrary to what you may expect you will receive the newline as \\r\\n. An empty string is returned when EOF is hit immediately. Currently, the size argument is mostly ignored, so this behavior is not standard for a file-like object. If size is 0 then an empty string is returned. """ if size == 0: return '' index = self.expect (['\r\n', self.delimiter]) # delimiter default is EOF if index == 0: return self.before + '\r\n' else: return self.before
def expect_exact(self, pattern_list, timeout = -1, searchwindowsize = -1): """This is similar to expect(), but uses plain string matching instead of compiled regular expressions in 'pattern_list'. The 'pattern_list' may be a string; a list or other sequence of strings; or TIMEOUT and EOF. This call might be faster than expect() for two reasons: string searching is faster than RE matching and it is possible to limit the search to just the end of the input buffer. This method is also useful when you don't want to have to worry about escaping regular expression characters that you want to match.""" if type(pattern_list) in types.StringTypes or pattern_list in (TIMEOUT, EOF): pattern_list = [pattern_list] return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
def __init__(self, strings): """This creates an instance of searcher_string. This argument 'strings' may be a list; a sequence of strings; or the EOF or TIMEOUT types. """ self.eof_index = -1 self.timeout_index = -1 self._strings = [] for n, s in zip(range(len(strings)), strings): if s is EOF: self.eof_index = n continue if s is TIMEOUT: self.timeout_index = n continue self._strings.append((n, s))
def tar2db(firmware_id): """Populates the db with information related to the filesystem.""" print(bcolors.OKBLUE + "[-] Writing filesystem information into database..." + bcolors.ENDC) command = TAR2DB_COMMAND.format(FIRMADYNE_PATH, firmware_id, OUTPUT_DIR) print(bcolors.ITALIC + command + bcolors.ENDC) tar2db = pexpect.spawn(command) # either an error is raised because keys already exist # which means the info has already been written in the db # or the command terminates properly index = tar2db.expect(['Key.*already exists', 'No such file or directory: .*\n', pexpect.EOF]) if index == 0: print(bcolors.WARNING + "[!] This step was already performed earlier..." + bcolors.ENDC) return True elif index == 1: missing_file = tar2db.after.split('\n')[0].split(':')[1].strip() print(bcolors.FAIL + "[!] The file {} does not exist...".format(missing_file) + bcolors.ENDC) return False else: print(bcolors.OKGREEN + "[+] Filesystem information successfully written!" + bcolors.ENDC) return True
def query(self, cmd): if not self.con.isalive(): raise errors.DeadConsoleError self.con.sendline(cmd) try: p_re = [self.ps1_re] if self.exit_re: p_re.insert(0, self.exit_re) if self.prompt_re: p_re.insert(0, self.prompt_re) pattern_index = self.con.expect(p_re) if pattern_index == 0: return self.con.before elif pattern_index == 1: self.close() return '^exit' elif pattern_index == 2: self.con.close() return '^kill' except (pexpect.TIMEOUT, pexpect.EOF): ## Connection's probably dead, close the socket self.close() raise errors.ConsoleSessionError raise errors.UnexpectedResponseError
def test_run_command_interactively(self): file_path = os.path.join(tempfile.gettempdir(), 'test_interactive.txt') with open(file_path, 'wr') as ff: ff.write('Some text') context = self.execute_module_step( 'run_command_interactively', kwargs={ 'command': 'rm -i %s' % file_path, } ) # file should not be removed yet assert_that(os.path.isfile(file_path), equal_to(True)) # let's communicate and say Yes context.command_response['child'].sendline('Y') context.command_response['child'].expect(pexpect.EOF) # file should be removed assert_that(os.path.isfile(file_path), equal_to(False))
def run(command, fail_on_error=False, interactively=False): child = pexpect.spawn('/bin/sh', ['-c', command], echo=False) child.logfile_read = StringIO.StringIO() child.logfile_send = StringIO.StringIO() if not interactively: child.expect(pexpect.EOF) if fail_on_error and child.exitstatus > 0: raise Exception( '%s (exit code %s)' % ( child.logfile_read.getvalue(), child.exitstatus ) ) return { 'child': child, }
def main (): while True: ps = pexpect.spawn ('ps') time.sleep (1) index = ps.expect (['/usr/bin/ssh', pexpect.EOF, pexpect.TIMEOUT]) if index == 2: print('TIMEOUT in ps command...') print(str(ps)) time.sleep (13) if index == 1: print(time.asctime(), end=' ') print('restarting tunnel') start_tunnel () time.sleep (11) print('tunnel OK') else: # print 'tunnel OK' time.sleep (7)
def run(self): items = [ (event["pattern"], event) for event in self._event_vector.values() ] patterns = [item[0] for item in items] events = [item[1] for item in items] log.info('Running...') while self._parent_aliveness.is_set(): try: event_index = self._connection.expect(patterns, timeout=.5) except pexpect.TIMEOUT: continue except (NotConnectedError, pexpect.EOF): self._event_vector["disconnected"]["event"].set() break event = events[event_index] event["before"] = self._connection.before event["after"] = self._connection.after event["match"] = self._connection.match event["event"].set() if event["callback"]: event["callback"](event) log.info("Listener thread finished")
def reload(self, reload_timeout=300, save_config=True): """Reload the device. CSM_DUT#reload System configuration has been modified. Save? [yes/no]: yes Building configuration... [OK] Proceed with reload? [confirm] """ response = "yes" if save_config else "no" events = [SAVE_CONFIG, PROCEED, pexpect.TIMEOUT, pexpect.EOF] transitions = [ (SAVE_CONFIG, [0], 1, partial(a_send_line, response), 60), (PROCEED, [0, 1], 2, partial(a_send, "\r"), reload_timeout), # if timeout try to send the reload command again (pexpect.TIMEOUT, [0], 0, partial(a_send_line, self.reload_cmd), 10), (pexpect.TIMEOUT, [2], -1, a_reconnect, 0), (pexpect.EOF, [0, 1, 2], -1, a_disconnect, 0) ] fsm = FSM("IOS-RELOAD", self.device, events, transitions, timeout=10, max_transitions=5) return fsm.run()
def restart(self): """ Restart ``ovs-vswitchd`` instance. ``ovsdb-server`` is not restarted. :raises: pexpect.EOF, pexpect.TIMEOUT """ self._logger.info("Restarting vswitchd...") if os.path.isfile(self._vswitchd_pidfile_path): self._logger.info('Killing ovs-vswitchd...') with open(self._vswitchd_pidfile_path, "r") as pidfile: vswitchd_pid = pidfile.read().strip() tasks.terminate_task(vswitchd_pid, logger=self._logger) try: tasks.Process.start(self) self.relinquish() except (pexpect.EOF, pexpect.TIMEOUT) as exc: logging.error("Exception during VSwitch start.") self._kill_ovsdb() raise exc self._logger.info("Vswitchd...Started.")
def start(self): """Activates DPDK kernel modules and starts VPP :raises: pexpect.EOF, pexpect.TIMEOUT """ dpdk.init() self._logger.info("Starting VPP...") self._cmd = self._cmd_template + self._vswitch_args try: tasks.Process.start(self) self.relinquish() except (pexpect.EOF, pexpect.TIMEOUT) as exc: logging.error("Exception during VPP start.") raise exc self._logger.info("VPP...Started.")
def stop(self): """See IPktFwd for general description Kills testpmd. """ try: self._testpmd.send('stop') self._testpmd.wait('Done.', 5) self._testpmd.send('quit', 2) self._testpmd.kill() except pexpect.EOF: pass dpdk.cleanup() # Method could be a function # pylint: disable=no-self-use
def test_place_match(place): with pexpect.spawn('python -m labgrid.remote.client -p test add-match e1/g1/r1 e2/g2/*') as spawn: spawn.expect(pexpect.EOF) spawn.close() assert spawn.exitstatus == 0 with pexpect.spawn('python -m labgrid.remote.client -p test del-match e1/g1/r1') as spawn: spawn.expect(pexpect.EOF) spawn.close() assert spawn.exitstatus == 0 with pexpect.spawn('python -m labgrid.remote.client -p test show') as spawn: spawn.expect(" matches:") spawn.expect(" e2/g2/*") spawn.expect(pexpect.EOF) spawn.close() assert spawn.exitstatus == 0
def test_autoinstall_simple(tmpdir): c = tmpdir.join("config.yaml") c.write(""" targets: test1: resources: {} drivers: {} autoinstall: handler: | print("handler-test-output") """) with pexpect.spawn('python -m labgrid.autoinstall.main --once {}'.format(c)) as spawn: spawn.expect("handler-test-output") spawn.expect(pexpect.EOF) spawn.close() assert spawn.exitstatus == 0
def readline(self, size=-1): '''This reads and returns one entire line. The newline at the end of line is returned as part of the string, unless the file ends without a newline. An empty string is returned if EOF is encountered immediately. This looks for a newline as a CR/LF pair (\\r\\n) even on UNIX because this is what the pseudotty device returns. So contrary to what you may expect you will receive newlines as \\r\\n. If the size argument is 0 then an empty string is returned. In all other cases the size argument is ignored, which is not standard behavior for a file-like object. ''' if size == 0: return self.string_type() # delimiter default is EOF index = self.expect([self.crlf, self.delimiter]) if index == 0: return self.before + self.crlf else: return self.before
def __init__(self, strings): '''This creates an instance of searcher_string. This argument 'strings' may be a list; a sequence of strings; or the EOF or TIMEOUT types. ''' self.eof_index = -1 self.timeout_index = -1 self._strings = [] for n, s in enumerate(strings): if s is EOF: self.eof_index = n continue if s is TIMEOUT: self.timeout_index = n continue self._strings.append((n, s))
def __init__(self, patterns): '''This creates an instance that searches for 'patterns' Where 'patterns' may be a list or other sequence of compiled regular expressions, or the EOF or TIMEOUT types.''' self.eof_index = -1 self.timeout_index = -1 self._searches = [] for n, s in zip(list(range(len(patterns))), patterns): if s is EOF: self.eof_index = n continue if s is TIMEOUT: self.timeout_index = n continue self._searches.append((n, s))
def __str__(self): '''This returns a human-readable string that represents the state of the object.''' #ss = [(n, ' %d: re.compile("%s")' % # (n, repr(s.pattern))) for n, s in self._searches] ss = list() for n, s in self._searches: try: ss.append((n, ' %d: re.compile("%s")' % (n, s.pattern))) except UnicodeEncodeError: # for test cases that display __str__ of searches, dont throw # another exception just because stdout is ascii-only, using # repr() ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern))) ss.append((-1, 'searcher_re:')) if self.eof_index >= 0: ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) if self.timeout_index >= 0: ss.append((self.timeout_index, ' %d: TIMEOUT' % self.timeout_index)) ss.sort() ss = list(zip(*ss))[1] return '\n'.join(ss)
def test_expect (self): the_old_way = subprocess.Popen(args=['ls', '-l', '/bin'], stdout=subprocess.PIPE).communicate()[0].rstrip() p = pexpect.spawn('ls -l /bin') the_new_way = b'' while 1: i = p.expect ([b'\n', pexpect.EOF]) the_new_way = the_new_way + p.before if i == 1: break the_new_way = the_new_way.rstrip() the_new_way = the_new_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() the_old_way = the_old_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
def test_expect_exact (self): the_old_way = subprocess.Popen(args=['ls', '-l', '/bin'], stdout=subprocess.PIPE).communicate()[0].rstrip() p = pexpect.spawn('ls -l /bin') the_new_way = b'' while 1: i = p.expect_exact ([b'\n', pexpect.EOF]) the_new_way = the_new_way + p.before if i == 1: break the_new_way = the_new_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() the_old_way = the_old_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way) p = pexpect.spawn('echo hello.?world') i = p.expect_exact(b'.?') self.assertEqual(p.before, b'hello') self.assertEqual(p.after, b'.?')
def test_log_logfile_read (self): log_message = 'This is a test.' filename = tempfile.mktemp() mylog = open(filename, 'wb') p = pexpect.spawn('cat') p.logfile_read = mylog p.sendline(log_message) p.sendeof() p.expect(pexpect.EOF) p.logfile = None mylog.close() with open(filename, 'rb') as f: lf = f.read() os.unlink (filename) lf = lf.replace(_CAT_EOF, b'') self.assertEqual(lf, b'This is a test.\r\nThis is a test.\r\n')
def test_log_logfile_send (self): log_message = b'This is a test.' filename = tempfile.mktemp() mylog = open (filename, 'wb') p = pexpect.spawn('cat') p.logfile_send = mylog p.sendline(log_message) p.sendeof() p.expect (pexpect.EOF) p.logfile = None mylog.close() with open(filename, 'rb') as f: lf = f.read() os.unlink(filename) lf = lf.replace(b'\x04', b'') self.assertEqual(lf.rstrip(), log_message)
def test_searcher_re (self): # This should be done programatically, if we copied and pasted output, # there wouldnt be a whole lot to test, really, other than our ability # to copy and paste correctly :-) ss = pexpect.searcher_re ([ re.compile('this'), re.compile('that'), re.compile('and'), re.compile('the'), re.compile('other') ]) out = ('searcher_re:\n 0: re.compile("this")\n ' '1: re.compile("that")\n 2: re.compile("and")\n ' '3: re.compile("the")\n 4: re.compile("other")') assert ss.__str__() == out, (ss.__str__(), out) ss = pexpect.searcher_re ([ pexpect.TIMEOUT, re.compile('this'), re.compile('that'), re.compile('and'), pexpect.EOF,re.compile('other') ]) out = ('searcher_re:\n 0: TIMEOUT\n 1: re.compile("this")\n ' '2: re.compile("that")\n 3: re.compile("and")\n ' '4: EOF\n 5: re.compile("other")') assert ss.__str__() == out, (ss.__str__(), out)
def check_result(result, success_message): ''' Function to check result of a pexpect operation. Accepts a success message. ''' if result == 0: logger.debug('Test passed: %s' % success_message) return True elif result == 1: logger.warning('EOF - Test failed') return False elif result == 2: logger.warning(' Timed out - Test failed') return False else: logger.warning(' Generic - Test failed') return False
def goToLine(self, fh, linenumber): """ Go to 'linenumber' of a huge text file in an (memory-)efficient way. """ if linenumber < 1: raise IOError( "Specified linenumber '%d' is smaller than 1." % linenumber ) fh.seek(0, os.SEEK_SET) # Skip lines until desired line is reached. for _ in range(0, linenumber - 1): read = fh.readline() if read == "": # Empty string represents EOF. raise OutOfScopeException(msg="goToLine error: ", line=linenumber)
def poll(self, timeout=None): if self.pending_data: return True if self.done: raise EOFError('Child process is done') if timeout is None: timeout = 0 while True: try: self.child.expect(r'.', timeout=timeout) except pexpect.TIMEOUT: return True if self.pending_data else False except pexpect.EOF: self.pending_data += self.child.before self.done = True if self.pending_data: return True raise EOFError('Child process is done') else: self.pending_data += self.child.before + self.child.after
def _execute(self, cmd, **kwargs): """ Execute a command on a remote host. Parameters ---------- cmd : string Command to be executed on remote host. kwargs : keywords Options to pass to subprocess.Popen. Returns ------- proc : Popen subprocess Subprocess used to run remote command. """ template = 'ssh {login} -T -o ControlPath={socket} << EOF\n{cmd}\nEOF' config = dict(self._subprocess_config) config.update(kwargs) return run_in_subprocess(template.format(login=self._login_info, socket=self._socket_path, cmd=cmd), check_output=True, **config)
def test_deploy(): directory = mktree(FORGE_YAML + APP, MANGLE=MANGLE) os.environ["FORGE_PROFILE"] = "dev" forge = launch(directory, "forge deploy") forge.expect('built') forge.expect('forgetest/Dockerfile') forge.expect('pushed') forge.expect('forgetest-[0-9-]+:') forge.expect('rendered') forge.expect('service/forgetest-[0-9-]+') forge.expect('deployment/forgetest-[0-9-]+') forge.expect('deployed') forge.expect('forgetest-[0-9-]+') forge.expect(pexpect.EOF) assert forge.wait() == 0 for sub in ("forgetest", "forgetest/subdir"): forge = launch(os.path.join(directory, "forgetest/subdir"), "forge deploy") forge.expect('rendered') forge.expect('service/forgetest-[0-9-]+') forge.expect('deployment/forgetest-[0-9-]+') forge.expect('deployed') forge.expect('forgetest-[0-9-]+') forge.expect(pexpect.EOF) assert forge.wait() == 0
def do_test_rebuilder(tree, path): directory = mktree(FORGE_YAML + tree, MANGLE=MANGLE) forge = launch(directory, "forge build containers") forge.expect(pexpect.EOF) assert forge.wait() == 0 assert run_image(directory).strip() == "hello" with open(os.path.join(directory, path), "write") as f: f.write('print("goodbye")\n') forge = launch(directory, "forge build containers") forge.expect(pexpect.EOF) assert forge.wait() == 0 assert run_image(directory).strip() == "goodbye" forge = launch(directory, "forge clean") forge.expect("docker kill ") forge.expect(pexpect.EOF) assert forge.wait() == 0
def halt(self): self.login() self.child.send("halt\n") try: # Wait for text confirming the halt, or EOF self.child.expect("(The operating system has halted)|(entering state S5)", timeout = 60) except pexpect.EOF: # Didn't see the text but got an EOF; that's OK. print "EOF" except pexpect.TIMEOUT, e: # This is unexpected but mostly harmless print "timeout waiting for halt confirmation:", e # Calling this directly is deprecated, use Anita.login()
def testCommands(self): p = pexpect.spawn(self.ravelCmd) p.expect("ravel>") p.sendline("exit") p.expect(pexpect.EOF)
def _try_passwordless_openssh(server, keyfile): """Try passwordless login with shell ssh command.""" if pexpect is None: raise ImportError("pexpect unavailable, use paramiko") cmd = 'ssh -f '+ server if keyfile: cmd += ' -i ' + keyfile cmd += ' exit' # pop SSH_ASKPASS from env env = os.environ.copy() env.pop('SSH_ASKPASS', None) ssh_newkey = 'Are you sure you want to continue connecting' p = pexpect.spawn(cmd, env=env) while True: try: i = p.expect([ssh_newkey, _password_pat], timeout=.1) if i==0: raise SSHException('The authenticity of the host can\'t be established.') except pexpect.TIMEOUT: continue except pexpect.EOF: return True else: return False
def exit(self): """Send Ctrl + D to exit. """ self.cli.sendcontrol('d') self.cli.expect(pexpect.EOF)
def run(self, cmd): ssh_newkey = 'Are you sure you want to continue connecting' child = pexpect.spawn('ssh -l %s -p %s %s %s'%(self._user, self._port, self._host, cmd)) i = child.expect([pexpect.TIMEOUT, ssh_newkey, 'password: ']) if i == 0: #timeout raise Exception('ssh timeout') if i == 1: #SSH does not have the public key. Just accept it. child.sendline('yes') child.expect('password: ') i = child.expect([pexpect.TIMEOUT, 'password: ']) if i == 0: #timeout raise Exception('ssh timeout') child.sendline(self._pwd) child.expect(pexpect.EOF) return child.before
def logout (self): '''Sends exit to the remote shell. If there are stopped jobs then this automatically sends exit twice. ''' self.sendline("exit") index = self.expect([EOF, "(?i)there are stopped jobs"]) if index==1: self.sendline("exit") self.expect(EOF) self.close()