我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用pexpect.spawn()。
def testHelpMessageForSQLitePlugin(self): """test the --help option for SQLite""" helper = end_to_end_test_helper.EndToEndTestHelper('not needed', 'not needed') if platform.system() in ['Linux']: message_help = ( 'Usage: main.py sqlite [OPTIONS]\r\n\r\n' 'Options:\r\n ' '--path TEXT The path to plaso\r\n ' '--name TEXT The plugin name\r\n ' '--testfile TEXT The testfile path\r\n ' '--sql / --no-sql The output example flag for the SQL Query for the ' 'plugin.\r\n ' '--help Show this message and exit.') command = 'python {0} sqlite --help'.format(helper.MAIN_PATH) child = pexpect.spawn(command) child.expect_exact(message_help) else: raise NotImplementedError("test only implemented for linux platform")
def test_vi_mode(self): self.write_config('vi = True\n') bin_path = get_http_prompt_path() child = pexpect.spawn(bin_path, env=os.environ) child.expect_exact('http://localhost:8000> ') # Enter 'htpie', switch to command mode (ESC), # move two chars left (hh), and insert (i) a 't' child.send('htpie') child.send('\x1b') child.sendline('hhit') child.expect_exact('http http://localhost:8000') # Enter 'exit' child.send('\x1b') child.send('i') child.sendline('exit') child.expect_exact('Goodbye!', timeout=20) child.close()
def testHelpMessage(self): """test the universal --help Option""" helper = end_to_end_test_helper.EndToEndTestHelper('not needed', 'not needed') if platform.system() in ['Linux']: message_help = ('Usage: main.py [OPTIONS] COMMAND [ARGS]...\r\n\r\n' 'Options:\r\n' ' --help Show this message and exit.\r\n\r\n' 'Commands:\r\n' ' sqlite') command = 'python {0} --help'.format(helper.MAIN_PATH) child = pexpect.spawn(command) child.expect_exact(message_help) else: raise NotImplementedError("test only implemented for linux platform")
def testCommands(self): p = pexpect.spawn(self.ravelCmd) p.expect("ravel>") p.sendline("help") p.expect("Commands") p.sendline("p SELECT * FROM hosts;") p.expect("hid") p.sendline("stat") p.expect("app path") p.sendline("apps") p.expect("offline") p.sendline("exit") p.sendeof() time.sleep(1) # Part 3 - Orchestratioon
def GetGcloud(args, project=None, service=None): """Get gcloud command with arguments. Functionalities might be expanded later to run gcloud commands. Args: args: command with arguments as an array project: the project on which the glcoud compute will work service: the service on gcloud that you want to use. default: compute Returns: returns thr formatted command for gcloud compute """ command = ["gcloud"] if service: command.append(service) if project: command.extend(["--project", project]) command.extend(args) return command # TODO(sohamcodes): pexpect.spawn can be changed to subprocess call # However, timeout for subprocess is available only on python3 # So, we can implement it later.
def RunCommand(args, timeout=None, logfile=None): """Runs a given command through pexpect.run. This function acts as a wrapper over pxpect.run . You can have exception or return values based on the exitstatus of the command execution. If exitstatus is not zero, then it will return -1, unless you want RuntimeError. If there is TIMEOUT, then exception is raised. If events do not match, command's output is printed, and -1 is returned. Args: args: command with arguments as an array timeout: timeout for pexpect.run . logfile: an opened filestream to write the output Raises: RuntimeError: Command's exit status is not zero Returns: Returns -1, if bad exitstatus is not zero and when events do not match Otherwise returns 0, if everything is fine """ child = pexpect.spawn(args[0], args=args[1:], timeout=timeout, logfile=logfile) child.expect(pexpect.EOF) child.close() if child.exitstatus: print args raise RuntimeError(("Error: {}\nProblem running command. " "Exit status: {}").format(child.before, child.exitstatus)) return 0
def bash(command="bash"): """Start a bash shell and return a :class:`REPLWrapper` object.""" bashrc = os.path.join(os.path.dirname(__file__), 'bashrc.sh') child = pexpect.spawn(command, ['--rcfile', bashrc], echo=False, encoding='utf-8') # If the user runs 'env', the value of PS1 will be in the output. To avoid # replwrap seeing that as the next prompt, we'll embed the marker characters # for invisible characters in the prompt; these show up when inspecting the # environment variable, but not when bash displays the prompt. ps1 = PEXPECT_PROMPT[:5] + u'\[\]' + PEXPECT_PROMPT[5:] ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\[\]' + PEXPECT_CONTINUATION_PROMPT[5:] prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2) return REPLWrapper(child, u'\$', prompt_change, extra_init_cmd="export PAGER=cat")
def disconnect(child): """ - Disconnects warrior_cli_class session object(pexpect/paramiko) - Returns session object(same child) """ print_warning("This method is obsolete and will be deprecated soon. Please" " use 'disconnect' method of 'PexpectConnect' class " "in 'warrior/Framework/ClassUtils/WNetwork/warrior_cli_class.py'") if isinstance(child, WNetwork.warrior_cli_class.WarriorCli): child = child.disconnect() elif isinstance(child, pexpect.spawn): wc_obj = WNetwork.warrior_cli_class.WarriorCli() wc_obj.conn_obj = WNetwork.warrior_cli_class.PexpectConnect() wc_obj.conn_obj.target_host = child wc_obj.disconnect() return child
def _send_cmd(obj_session, **kwargs): """method to send command based on the type of object """ if isinstance(obj_session, WNetwork.warrior_cli_class.WarriorCli): result, response = obj_session._send_cmd(**kwargs) elif isinstance(obj_session, pexpect.spawn): wc_obj = WNetwork.warrior_cli_class.WarriorCli() wc_obj.conn_obj = WNetwork.warrior_cli_class.PexpectConnect() wc_obj.conn_obj.target_host = obj_session result, response = wc_obj._send_cmd(**kwargs) elif isinstance(obj_session, ssh_utils_class.SSHComm): command = kwargs.get('command') result, response = obj_session.get_response(command) print_info(response) else: print_warning('session object type is not supported') return result, response
def test_debug_bots(self, env): # Make sure debug server runs to completion with bots p = pexpect.spawn( 'dallinger', ['debug', '--verbose', '--bot'], env=env, ) p.logfile = sys.stdout try: p.expect_exact('Server is running', timeout=300) p.expect_exact('Recruitment is complete', timeout=600) p.expect_exact('Experiment completed', timeout=60) p.expect_exact('Local Heroku process terminated', timeout=10) finally: try: p.sendcontrol('c') p.read() except IOError: pass
def sshcmd(cmd, host, password,): ''' runs single command on host using ssh ''' p=expect.spawn("ssh " + host) i0=p.expect([r'.+password:.*']) if i0 == 0: p.sendline(password) i1=p.expect([r'Sorry.+', r'.*']) if i1==0: print('Wrongs password') return 1 else: print('inside') p.sendline(cmd) else: print('Not expected')
def launchMiraiScan(ip): global status cmd = 'perl iotScanner.pl %s' % ip starttime = time.time() mChild = pexpect.spawn('/bin/bash', ['-c',cmd], timeout = 600) # i = mChild.expect(['device ' + ip + ': failed to establish TCP connection',]) i = mChild.expect(['failed to establish TCP connection', 'doesnot have any password', 'still has default password', 'has changed password', 'didnot find dev type after trying all devices', 'due to 404 response', 'failed to establish TCP connection', 'http redirect to', 'unexpected status code', 'didnot find devType for', 'unexpected partial url', TERMINAL_PROMPT, COMMAND_PROMPT]) if i == 0: print('Run time:', round((time.time() - starttime),3)) setStatus('Non-vulnerable') elif i == 1 or i == 2: setStatus('Vulnerable') else: setStatus('Non-vulnerable')
def xonsh_shell(exe_dir, shell_path): if ON_WINDOWS: result = subprocess.run( [shell_path or 'xonsh', '-i', '-D', 'VIRTUAL_ENV={}'.format(os.path.dirname(exe_dir))], shell=NEED_SUBPROCESS_SHELL ) return result.returncode else: terminal = pexpect.spawn( shell_path or 'xonsh', args=['-i', '-D', 'VIRTUAL_ENV={}'.format(os.path.dirname(exe_dir))], dimensions=get_terminal_dimensions() ) def sigwinch_passthrough(sig, data): terminal.setwinsize(*get_terminal_dimensions()) signal.signal(signal.SIGWINCH, sigwinch_passthrough) # Just in case pyenv works with xonsh, supersede it. terminal.sendline('$PATH.insert(0, "{}")'.format(exe_dir)) terminal.interact(escape_character=None) terminal.close() return terminal.exitstatus
def start_MAVProxy_SITL(atype, aircraft=None, setup=False, master='tcp:127.0.0.1:5760', options=None, logfile=sys.stdout): """Launch mavproxy connected to a SITL instance.""" import pexpect global close_list MAVPROXY = os.getenv('MAVPROXY_CMD', 'mavproxy.py') cmd = MAVPROXY + ' --master=%s --out=127.0.0.1:14550' % master if setup: cmd += ' --setup' if aircraft is None: aircraft = 'test.%s' % atype cmd += ' --aircraft=%s' % aircraft if options is not None: cmd += ' ' + options ret = pexpect.spawn(cmd, logfile=logfile, encoding=ENCODING, timeout=60) ret.delaybeforesend = 0 pexpect_autoclose(ret) return ret
def behind_source(org_config_filename, verbose=False, dry_run=False): command = 'emacs' args = ['-batch', '-l', org_config_filename, '-l', 'ts-org-interaction.el', '--eval=(ts-repl)'] spawn = pexpect.spawn(command, args, encoding='utf-8') emacs_repl_wrapper = pexpect.replwrap.REPLWrapper( spawn, "Lisp expression: ", None) if dry_run: o.DryRunSource.make_fn = make_fn source = o.DryRunSource.from_emacs_repl(emacs_repl_wrapper, verbose) else: o.Source.make_fn = make_fn source = o.Source.from_emacs_repl(emacs_repl_wrapper, verbose) source.get_tree = lambda: source.get_all_items( ['asana_id', 'asana_project_id']) return source
def _play(self): self.track_current = 0 self.track_percent = 0 self.loading = True url = self.track.url self.playing = pexpect.spawn( 'mpv', ["--no-ytdl", "%s" % url], timeout=None ) self.active = True self.loading = False thread = threading.Thread(target=self._monitor, args=(self.playing, self)) thread.daemon = True thread.start()
def change_password(old=None, new=None): if old is None: old, new = get_passes() p = pexpect.spawn('passwd') p.expect('password') p.sendline(old) outcome = p.expect(['New', 'incorrect', 'error']) p.sendline(new) try: outcome = p.expect('ew password:', timeout=1) if p.match is None: print p.buffer, 'new password' else: p.sendline(new) outcome = p.expect(['success'] , timeout=1) if p.match is not None: return old, new except: print p.buffer, 'top level' return False
def tar2db(firmware_id): """Populates the db with information related to the filesystem.""" print(bcolors.OKBLUE + "[-] Writing filesystem information into database..." + bcolors.ENDC) command = TAR2DB_COMMAND.format(FIRMADYNE_PATH, firmware_id, OUTPUT_DIR) print(bcolors.ITALIC + command + bcolors.ENDC) tar2db = pexpect.spawn(command) # either an error is raised because keys already exist # which means the info has already been written in the db # or the command terminates properly index = tar2db.expect(['Key.*already exists', 'No such file or directory: .*\n', pexpect.EOF]) if index == 0: print(bcolors.WARNING + "[!] This step was already performed earlier..." + bcolors.ENDC) return True elif index == 1: missing_file = tar2db.after.split('\n')[0].split(':')[1].strip() print(bcolors.FAIL + "[!] The file {} does not exist...".format(missing_file) + bcolors.ENDC) return False else: print(bcolors.OKGREEN + "[+] Filesystem information successfully written!" + bcolors.ENDC) return True
def network_setup(firmware_id, arch): """Determines the network configuration of the firmware by emulating it for a certain amount of time.""" runtime = 60 print(bcolors.OKBLUE + "[-] Determining the network configuration of the firmware..." + bcolors.ENDC) print(bcolors.OKBLUE + "[-] The firmware will now be running for {} seconds...".format(runtime) + bcolors.ENDC) command = INFERNETWORK_COMMAND.format(FIRMADYNE_PATH, firmware_id) + " " + arch print(bcolors.ITALIC + command + bcolors.ENDC) setup = pexpect.spawn(command, timeout=runtime + 5) # info should be provided as regards the interfaces available to access the emulated firmware setup.expect('Interfaces: \[(.*)\]\n') if setup.match.group(1) == "": print(bcolors.WARNING + "[!] No network interface could be determined..." + bcolors.ENDC) return False else: print(bcolors.OKGREEN + "[+] Your firmware will be accessible at {}!".format(setup.match.group(1)) + bcolors.ENDC) return True
def _bash_repl(command="bash", remove_ansi=True): """Start a bash shell and return a :class:`REPLWrapper` object.""" # `repl_bashrc.sh` suppresses user-defined PS1 bashrc = os.path.join(os.path.dirname(__file__), 'repl_bashrc.sh') child = pexpect.spawn(command, ['--rcfile', bashrc], echo=False, encoding='utf-8') # If the user runs 'env', the value of PS1 will be in the output. To avoid # replwrap seeing that as the next prompt, we'll embed the marker characters # for invisible characters in the prompt; these show up when inspecting the # environment variable, but not when bash displays the prompt. ps1 = PEXPECT_PROMPT[:5] + u'\[\]' + PEXPECT_PROMPT[5:] ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\[\]' + PEXPECT_CONTINUATION_PROMPT[5:] prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2) return REPLWrapper(child, u'\$', prompt_change, remove_ansi=remove_ansi, extra_init_cmd="export PAGER=cat")
def connectToUserWifi(): try: child = pexpect.spawn('sudo wifi autoconnect') child.expect('Connecting *') time.sleep(20) except: response = requests.get('http://joedye.me/pi-order/config').json() response = json.loads(response) deleteUserWifiIfPresent() child = pexpect.spawn('sudo wifi connect --ad-hoc ' + response['wifi-network']) child.expect('passkey>') child.sendline(str(response['wifi-password'])) childForAdd = pexpect.spawn('sudo wifi add userWifi '+ response['wifi-network']) childForAdd.expect('passkey>') childForAdd.sendline(str(response['wifi-password'])) time.sleep(30) #print(response['wifi-network']) #print(response['wifi-password'])
def connect(self): self.con = pexpect.spawn('/usr/bin/env bash') for hop in self.hops: self.con.sendline('ssh -tt {host}'.format(host = hop)) self.hostname = hop if self.target_host: self.con.sendline('ssh -tt {host}'.format(host = self.target_host)) self.hostname = self.target_host self.con.sendline(self.ps1_export_cmd) self.con.expect(self.ps1_re) #all_sessions.append(self) return self
def run(command, fail_on_error=False, interactively=False): child = pexpect.spawn('/bin/sh', ['-c', command], echo=False) child.logfile_read = StringIO.StringIO() child.logfile_send = StringIO.StringIO() if not interactively: child.expect(pexpect.EOF) if fail_on_error and child.exitstatus > 0: raise Exception( '%s (exit code %s)' % ( child.logfile_read.getvalue(), child.exitstatus ) ) return { 'child': child, }
def main (): while True: ps = pexpect.spawn ('ps') time.sleep (1) index = ps.expect (['/usr/bin/ssh', pexpect.EOF, pexpect.TIMEOUT]) if index == 2: print('TIMEOUT in ps command...') print(str(ps)) time.sleep (13) if index == 1: print(time.asctime(), end=' ') print('restarting tunnel') start_tunnel () time.sleep (11) print('tunnel OK') else: # print 'tunnel OK' time.sleep (7)
def clear_bond(self, address=None): """Use the 'bluetoothctl' program to erase a stored BLE bond. """ con = pexpect.spawn('sudo bluetoothctl') con.expect("bluetooth", timeout=1) log.info("Clearing bond for %s", address) con.sendline("remove " + address.upper()) try: con.expect( ["Device has been removed", "# "], timeout=.5 ) except pexpect.TIMEOUT: log.error("Unable to remove bonds for %s: %s", address, con.before) log.info("Removed bonds for %s", address)
def run_interactive_task(cmd, logger, msg): """Run a task interactively and log when started. Run given task using ``pexpect.spawn``. Log the command used. Performs neither validation of the process - if the process successfully started or is still running - nor killing of the process. The user must do both. :param cmd: Exact command to be executed :param logger: Logger to write details to :param msg: Message to be shown to user :returns: ``pexpect.child`` object """ logger.info(msg) logger.debug('%s%s', CMD_PREFIX, cmd) child = pexpect.spawnu(cmd) if settings.getValue('VERBOSITY') == 'debug': child.logfile_read = sys.stdout return child
def test_place_match(place): with pexpect.spawn('python -m labgrid.remote.client -p test add-match e1/g1/r1 e2/g2/*') as spawn: spawn.expect(pexpect.EOF) spawn.close() assert spawn.exitstatus == 0 with pexpect.spawn('python -m labgrid.remote.client -p test del-match e1/g1/r1') as spawn: spawn.expect(pexpect.EOF) spawn.close() assert spawn.exitstatus == 0 with pexpect.spawn('python -m labgrid.remote.client -p test show') as spawn: spawn.expect(" matches:") spawn.expect(" e2/g2/*") spawn.expect(pexpect.EOF) spawn.close() assert spawn.exitstatus == 0
def test_autoinstall_simple(tmpdir): c = tmpdir.join("config.yaml") c.write(""" targets: test1: resources: {} drivers: {} autoinstall: handler: | print("handler-test-output") """) with pexpect.spawn('python -m labgrid.autoinstall.main --once {}'.format(c)) as spawn: spawn.expect("handler-test-output") spawn.expect(pexpect.EOF) spawn.close() assert spawn.exitstatus == 0
def __init__(self, cmd_or_spawn, orig_prompt, prompt_change, new_prompt=PEXPECT_PROMPT, continuation_prompt=PEXPECT_CONTINUATION_PROMPT): if isinstance(cmd_or_spawn, str): self.child = pexpect.spawnu(cmd_or_spawn, echo=False) else: self.child = cmd_or_spawn if self.child.echo: # Existing spawn instance has echo enabled, disable it # to prevent our input from being repeated to output. self.child.setecho(False) self.child.waitnoecho() if prompt_change is None: self.prompt = orig_prompt else: self.set_prompt(orig_prompt, prompt_change.format(new_prompt, continuation_prompt)) self.prompt = new_prompt self.continuation_prompt = continuation_prompt self._expect_prompt()
def test_expect (self): the_old_way = subprocess.Popen(args=['ls', '-l', '/bin'], stdout=subprocess.PIPE).communicate()[0].rstrip() p = pexpect.spawn('ls -l /bin') the_new_way = b'' while 1: i = p.expect ([b'\n', pexpect.EOF]) the_new_way = the_new_way + p.before if i == 1: break the_new_way = the_new_way.rstrip() the_new_way = the_new_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() the_old_way = the_old_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
def test_expect_exact (self): the_old_way = subprocess.Popen(args=['ls', '-l', '/bin'], stdout=subprocess.PIPE).communicate()[0].rstrip() p = pexpect.spawn('ls -l /bin') the_new_way = b'' while 1: i = p.expect_exact ([b'\n', pexpect.EOF]) the_new_way = the_new_way + p.before if i == 1: break the_new_way = the_new_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() the_old_way = the_old_way.replace(b'\r\n', b'\n' ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip() assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way) p = pexpect.spawn('echo hello.?world') i = p.expect_exact(b'.?') self.assertEqual(p.before, b'hello') self.assertEqual(p.after, b'.?')
def test_signal_handling(self): ''' This tests the error handling of a signal interrupt (usually a SIGWINCH generated when a window is resized), but in this test, we are substituting an ALARM signal as this is much easier for testing and is treated the same as a SIGWINCH. To ensure that the alarm fires during the expect call, we are setting the signal to alarm after 1 second while the spawned process sleeps for 2 seconds prior to sending the expected output. ''' def noop(x, y): pass signal.signal(signal.SIGALRM, noop) p1 = pexpect.spawn('%s sleep_for.py 2' % self.PYTHONBIN, timeout=5) p1.expect('READY') signal.alarm(1) p1.expect('END')
def get_proc(shell, homedir): return pexpect.spawn( shell[0], list(shell[1:]), timeout=5, env={ 'COVERAGE_PROCESS_START': os.environ.get( 'COVERAGE_PROCESS_START', '', ), 'PS1': PS1, 'TOP': os.environ.get('TOP', ''), 'HOME': str(homedir), 'PATH': os.path.dirname(sys.executable) + os.defpath }, )
def spawn(command, args): print command, ' '.join(args[1:]) ret = os.spawnvp(os.P_WAIT, command, args) if ret != 0: raise RuntimeError("could not run " + command) # Subclass pexpect.spawn to add logging of expect() calls
def expect(self, pattern, *args, **kwargs): print >>self.structured_log_f, "expect(" + repr(pattern) + ")" r = pexpect.spawn.expect(self, pattern, *args, **kwargs) print >>self.structured_log_f, "match(" + repr(self.match.group(0)) + ")" return r # Subclass urllib.FancyURLopener so that we can catch # HTTP 404 errors
def make_iso(self): self.download() spawn(makefs[0], makefs + \ [self.iso_path(), os.path.dirname(os.path.realpath(os.path.join(self.download_local_mi_dir(), self.arch())))]) self.tempfiles.append(self.iso_path()) # Get the architecture name. This is a hardcoded default for use # by the obsolete subclasses; the "URL" class overrides it.
def __del__(self): print "destroying domU", self.name spawn(self.frontend, [self.frontend, "destroy", self.name])
def slog(self, message): slog_info(self.structured_log_f, message) # Wrapper around pexpect.spawn to let us log the command for # debugging. Note that unlike os.spawnvp, args[0] is not # the name of the command.
def testStartupOptions(self): cmd = "python {0} ".format(resource_file("ravel.py")) p = pexpect.spawn(cmd + "--help") p.expect("Usage") p.sendeof() time.sleep(1) p = pexpect.spawn(cmd + "--topo=single,3") p.expect("ravel>") p.sendeof() time.sleep(1) p = pexpect.spawn(cmd + "--topo=single,3 --onlydb") p.expect("ravel>") p.sendline("m") p.sendline("net") p.expect("no CLI available") p.sendline("exit") p.sendeof() time.sleep(1) p = pexpect.spawn(cmd + "--topo single,3 --noctl") p.expect("Unable to contact the remote controller") p.expect("ravel>") p.sendline("exit") p.sendeof()
def testCommands(self): p = pexpect.spawn(self.ravelCmd) p.expect("ravel>") p.sendline("exit") p.expect(pexpect.EOF)
def testStartup(self): cmd = "python {0} ".format(resource_file("ravel.py")) p = pexpect.spawn(cmd + "--help") p.expect("Usage") p.sendeof() time.sleep(1) p = pexpect.spawn(cmd + "--topo=single,2") p.expect("ravel>") p.sendline("exit") p.sendeof() time.sleep(1) p = pexpect.spawn(cmd + "--topo=single,2 --onlydb") p.expect("ravel>") p.sendline("exit") p.sendeof() time.sleep(1) p = pexpect.spawn(cmd + "--topo=single,2 --verbosity=debug") p.expect("DEBUG") p.sendline("exit") p.sendeof() time.sleep(1) # Part 2 - Ravel Commands
def kernel_conda(kernel): """Run which conda in the test fixture kernel using Jupyter console.""" jupyter = pexpect.spawn('jupyter', [ 'console', '--kernel', kernel.name] ) jupyter.expect('In.*:') jupyter.sendline('!which conda') # input echo jupyter.readline() # path output path = jupyter.readline() jupyter.close() return path.decode('utf-8')
def brute(word): print "Trying:",word child = pexpect.spawn ('su') child.expect ('Password: ') child.sendline (word) i = child.expect (['.+\s#\s',LOGIN_ERROR]) if i == 0: print "\n\t[!] Root Password:",word child.sendline ('whoami') print child.before child.interact() #if i == 1: #print "Incorrect Password"
def brute(word): print "Trying:",word child = pexpect.spawn ('su '+user) child.expect ('Password: ') child.sendline (word) i = child.expect([LOGIN_ERROR, pexpect.TIMEOUT], timeout=5) if i == 1: print "\n\t[!] Password:",word child.sendline ('whoami') print child.before child.interact() #if i = 0: #print "Incorrect Password"
def _try_passwordless_openssh(server, keyfile): """Try passwordless login with shell ssh command.""" if pexpect is None: raise ImportError("pexpect unavailable, use paramiko") cmd = 'ssh -f '+ server if keyfile: cmd += ' -i ' + keyfile cmd += ' exit' # pop SSH_ASKPASS from env env = os.environ.copy() env.pop('SSH_ASKPASS', None) ssh_newkey = 'Are you sure you want to continue connecting' p = pexpect.spawn(cmd, env=env) while True: try: i = p.expect([ssh_newkey, _password_pat], timeout=.1) if i==0: raise SSHException('The authenticity of the host can\'t be established.') except pexpect.TIMEOUT: continue except pexpect.EOF: return True else: return False
def setUp(self): self.rsf = pexpect.spawn('python {}'.format(self.cli_path)) self.rsf.send('\r\n') self.rsf.expect_exact(self.raw_prompt, timeout=3)