def check(self): """ Check if the process has exited, and if an error occured. Returns None if process hasn't exited, or a CompletedProcess object if it has exited without an error. If the process has exited with an error, raises a CalledProcessError. If process has exited, all available stdout and stderr are captured into the returned object or raised exception. """ retcode = self.poll() if retcode is not None: stdout, stderr = self.communicate(timeout=0) completed = subprocess.CompletedProcess( args=self.args, returncode=retcode, stdout=stdout, stderr=stderr, ) completed.check_returncode() return completed
def _run(*args, env=None, check=False, timeout=None): with subprocess.Popen([a.encode('utf-8') for a in args], env=env, stdout=PIPE, stderr=PIPE) as process: try: stdout, stderr = process.communicate(input, timeout=timeout) except TimeoutExpired: process.kill() stdout, stderr = process.communicate() raise TimeoutExpired( process.args, timeout, output=stdout, stderr=stderr, ) except: process.kill() process.wait() raise retcode = process.poll() if check and retcode: raise subprocess.CalledProcessError( retcode, process.args, output=stdout, stderr=stderr, ) return subprocess.CompletedProcess(process.args, retcode, stdout, stderr)
def test_verify_container_mountpoints_connection_failure(config_files, monkeypatch): with open(config_files, "r") as main_file: def fake_lxc_exec_command(*popenargs, **kwargs): if get_command(popenargs) == 'lxc' and get_sub_command(popenargs) == 'exec': if get_command_parameter(popenargs, '--') == 'true': cmd = ['bash', '-c', '>&2 echo -e "lxc command failed" ; exit 1'] return subprocess.run(cmd, **kwargs) else: return subprocess.CompletedProcess("fakerun", 0, '') else: return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_exec_command) parser = ConfigurationParser(main_file) coordinator = SharedFolderCoordinator(parser) with pytest.raises(FatalError) as error: coordinator.verify_container_mountpoints('fake-container') assert 'fake-container' in error.value.message assert 'lxc command failed' in error.value.message
def test_verify_container_mountpoints_failure(config_files, monkeypatch): with open(config_files, "r") as main_file: def fake_lxc_exec_command(*popenargs, **kwargs): if get_command(popenargs) == 'lxc' and get_sub_command(popenargs) == 'exec': if get_command_parameter(popenargs, '--') == 'test': return subprocess.CompletedProcess("failure", 1, 'failure') else: return subprocess.CompletedProcess("fakerun", 0, '') else: return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_exec_command) parser = ConfigurationParser(main_file) coordinator = SharedFolderCoordinator(parser) with pytest.raises(FatalError) as error: coordinator.verify_container_mountpoints('fake-container') assert 'fake-container' in error.value.message assert '/foo/bar/target_mountpoint' in error.value.message
def test_dictionary(monkeypatch, config_files, capsys, command, command_args): def fake_lxc_config_command(*popenargs, **kwargs): if 'images.compression_algorithm' in popenargs[0]: return subprocess.CompletedProcess("fakerun", 0, '') else: return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_config_command) parser = edi._setup_command_line_interface() command_args.append(config_files) cli_args = parser.parse_args(command_args) command().run_cli(cli_args) out, err = capsys.readouterr() assert err == '' dictionary = yaml.load(out) assert dictionary.get('edi_config_directory') == os.path.dirname(config_files) assert dictionary.get('edi_project_plugin_directory') == os.path.join(os.path.dirname(config_files), 'plugins')
def test_config(monkeypatch, config_files, capsys, command, command_args): def fake_lxc_config_command(*popenargs, **kwargs): if 'images.compression_algorithm' in popenargs[0]: return subprocess.CompletedProcess("fakerun", 0, '') else: return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_config_command) parser = edi._setup_command_line_interface() command_args.append(config_files) cli_args = parser.parse_args(command_args) command().run_cli(cli_args) out, err = capsys.readouterr() assert err == '' merged_config = yaml.load(out) assert merged_config.get('bootstrap').get('architecture') == 'i386'
def test_runner_git_version(self): """ProcessRunner should return subprocess.CompletedProcess""" runner = ProcessRunner(['git'], workdir=str(self.tempdir)) proc = runner('version') self.assertIsInstance(proc, subprocess.CompletedProcess) self.assertIn('git version', proc.stdout)
def test_git_commit(tmpworkdir): mktree(tmpworkdir, { 'test.js': 'X', 'harrier.yml': """\ root: . assets: active: True""" }) with patch('harrier.tools.subprocess.run') as mock_run: mock_run.return_value = CompletedProcess(args=[], returncode=0, stdout=b'commit sha1\n') config = Config('harrier.yml') config.setup() build(config) assert gettree(tmpworkdir.join('build')) == { 'test.js': 'X', 'assets.json': """\ { "commit": "commit sha1", "files": { "test.js": "/test.js" } } """, } assert mock_run.called
def run(self, script, target=None) -> subprocess.CompletedProcess: path, script = self.write_script(script) cmd = [self.make_exe, '-f', str(path)] if target: cmd.append(target) res = subprocess.run(cmd, cwd=str(self.tmpdir), stdout=subprocess.PIPE, stderr=subprocess.PIPE) exit_nok = res.returncode not in self.valid_exit_codes output_nok = b'Syntax error' in res.stderr or b'ERROR' in res.stderr if exit_nok or output_nok: pytest.fail(self.format_error(script, res)) return res
def update_completed(self): """ return an instance of subprocess.CompletedProcess """ command = [ "nbh", "course-update-from-git", self.coursename ] completed = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return completed
def mocked_stdout(stdout): return subprocess.CompletedProcess(['blah', 'args'], 0, stdout, None)
def run( self, args: List[str], user: str, log_output_live: bool = False, env: Optional[Dict] = None, ) -> CompletedProcess: """ Run a command on this node the given user. Args: args: The command to run on the node. user: The username to SSH as. log_output_live: If `True`, log output live. If `True`, stderr is merged into stdout in the return value. env: Environment variables to be set on the node before running the command. A mapping of environment variable names to values. Returns: The representation of the finished process. Raises: CalledProcessError: The process exited with a non-zero code. """ ssh_args = self._compose_ssh_command(args=args, user=user, env=env) return run_subprocess(args=ssh_args, log_output_live=log_output_live)
def test_lxd_connection(config_files, monkeypatch): def fake_ansible_playbook_run(*popenargs, **kwargs): if get_command(popenargs) == 'ansible-playbook': assert 'lxd' == get_command_parameter(popenargs, '--connection') verify_inventory(get_command_parameter(popenargs, '--inventory')) verify_extra_vars(get_command_parameter(popenargs, '--extra-vars').lstrip('@')) # TODO: verify --user for ssh connection return subprocess.CompletedProcess("fakerun", 0, '') else: return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fake_ansible_playbook_run) def fakechown(*_): pass monkeypatch.setattr(shutil, 'chown', fakechown) with open(config_files, "r") as main_file: parser = ConfigurationParser(main_file) runner = PlaybookRunner(parser, "fake-container", "lxd") playbooks = runner.run_all() expected_playbooks = ['10_base_system', '20_networking', '30_foo'] assert playbooks == expected_playbooks
def test_verify_container_mountpoints(config_files, monkeypatch): with open(config_files, "r") as main_file: def fake_lxc_exec_command(*popenargs, **kwargs): if get_command(popenargs) == 'lxc' and get_sub_command(popenargs) == 'exec': return subprocess.CompletedProcess("fakerun", 0, '') else: return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_exec_command) parser = ConfigurationParser(main_file) coordinator = SharedFolderCoordinator(parser) coordinator.verify_container_mountpoints('fake-container')
def test_create_host_folders_successful_create(config_files, monkeypatch): with open(config_files, "r") as main_file: parser = ConfigurationParser(main_file) coordinator = SharedFolderCoordinator(parser) def fake_os_path_isdir(*_): return False monkeypatch.setattr(os.path, 'isdir', fake_os_path_isdir) def fake_os_path_exists(*_): return False monkeypatch.setattr(os.path, 'exists', fake_os_path_exists) def fake_mkdir_command(*popenargs, **kwargs): if get_command(popenargs) == 'mkdir' and get_sub_command(popenargs) == '-p': folder = popenargs[0][-1] assert 'valid_folder' in folder or 'work' in folder return subprocess.CompletedProcess("fakerun", 0, '') else: return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fake_mkdir_command) coordinator.create_host_folders() # successful mkdir
def test_target_configure(config_files, monkeypatch, capsys): def fakerun(*popenargs, **kwargs): if get_command(popenargs) == "ansible-playbook": return subprocess.CompletedProcess("fakerun", 0, '') else: print('Passthrough: {}'.format(get_command(popenargs))) return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fakerun) suppress_chown_during_debuild(monkeypatch) with workspace(): edi_exec = os.path.join(get_project_root(), 'bin', 'edi') project_name = 'pytest-{}'.format(get_random_string(6)) config_command = [edi_exec, 'config', 'init', project_name, 'debian-jessie-amd64'] run(config_command) # run as non root parser = edi._setup_command_line_interface() cli_args = parser.parse_args(['target', 'configure', 'remote-target', '{}-develop.yml'.format(project_name)]) Configure().run_cli(cli_args) out, err = capsys.readouterr() print(out) assert not err
def popen(self, std_in=None, environ=None): self.__verify_command() if self.dry_run: self.__stdout = self._DRY_RUN_OUTPUT self.__stderr = self._DRY_RUN_OUTPUT self.__returncode = 0 self.__debug_logging_method("dry-run: {}".format(self.command)) return subprocess.CompletedProcess( args=[], returncode=self.__returncode, stdout=self.__stdout, stderr=self.__stderr) self.__debug_print_command() try: process = subprocess.Popen( self.command, env=self.__get_env(environ), shell=True, stdin=std_in, stdout=PIPE, stderr=PIPE) except TypeError: process = subprocess.Popen( self.command, shell=True, stdin=std_in, stdout=PIPE, stderr=PIPE) return process
def ex_subprocess(cmd): """subprocess wrapper function Arguments: - cmd (list(str)): the command to feed the subprocess Return: (out, compl_proc, error_proc) (3-uple): - out (int) : 1 suprocess fail, 0 subprocess succeed - compl_proc (subprocess.CompletedProcess instance): return value of the subprocess when it has succeeded - error_proc (subprocess.CalledProcessError instance): value of the subprocess when an Exception has occured """ out = 1 compl_proc = None error_proc = None try: compl_proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) except subprocess.CalledProcessError as error: error_proc = error else: out = 0 finally: return out, compl_proc, error_proc
def retrieve_stdout(self): """Get the stdout from a completed process instance Arguments: - comp_proc (subprocess.CompletedProcess instance) : instance that is returned when subprocess.run() function is completed """ return self.compl_proc.stdout.decode('UTF-8').rstrip()
def runCmd(*args, captureOutput=False, captureError=False, input: "typing.Union[str, bytes]"=None, timeout=None, printVerboseOnly=False, runInPretendMode=False, **kwargs): if len(args) == 1 and isinstance(args[0], (list, tuple)): cmdline = args[0] # list with parameters was passed else: cmdline = args cmdline = list(map(str, cmdline)) # ensure it's all strings so that subprocess can handle it # When running scripts from a noexec filesystem try to read the interpreter and run that printCommand(cmdline, cwd=kwargs.get("cwd"), env=kwargs.get("env"), printVerboseOnly=printVerboseOnly) if "cwd" in kwargs: kwargs["cwd"] = str(kwargs["cwd"]) else: # os.getcwd() raises an exception if the cwd was deleted try: kwargs["cwd"] = os.getcwd() except FileNotFoundError: kwargs["cwd"] = tempfile.gettempdir() if not runInPretendMode and _cheriConfig.pretend: return CompletedProcess(args=cmdline, returncode=0, stdout=b"", stderr=b"") # actually run the process now: if input is not None: assert "stdin" not in kwargs # we need to use stdin here kwargs['stdin'] = subprocess.PIPE if not isinstance(input, bytes): input = str(input).encode("utf-8") if captureOutput: assert "stdout" not in kwargs # we need to use stdout here kwargs["stdout"] = subprocess.PIPE if captureError: assert "stderr" not in kwargs # we need to use stdout here kwargs["stderr"] = subprocess.PIPE elif _cheriConfig.quiet and "stdout" not in kwargs: kwargs["stdout"] = subprocess.DEVNULL if "env" in kwargs: kwargs["env"] = dict((k, str(v)) for k, v in kwargs["env"].items()) with popen_handle_noexec(cmdline, **kwargs) as process: try: stdout, stderr = process.communicate(input, timeout=timeout) except subprocess.TimeoutExpired: process.kill() stdout, stderr = process.communicate() # TODO py35: pass stderr=stderr as well raise subprocess.TimeoutExpired(process.args, timeout, output=stdout) except Exception as e: process.kill() process.wait() raise retcode = process.poll() if retcode: if _cheriConfig and _cheriConfig.pretend: cwd = (". Working directory was ", kwargs["cwd"]) if "cwd" in kwargs else () fatalError("Command ", "`" + " ".join(map(shlex.quote, process.args)) + "` failed with non-zero exit code ", retcode, *cwd, sep="") else: raise _make_called_process_error(retcode, process.args, stdout=stdout, cwd=kwargs["cwd"]) return CompletedProcess(process.args, retcode, stdout, stderr)
def run_integration_tests( self, pytest_command: List[str], env: Optional[Dict] = None, log_output_live: bool = False, ) -> subprocess.CompletedProcess: """ Run integration tests on a random master node. Args: pytest_command: The ``pytest`` command to run on the node. env: Environment variables to be set on the node before running the `pytest_command`. On enterprise clusters, `DCOS_LOGIN_UNAME` and `DCOS_LOGIN_PW` must be set. log_output_live: If `True`, log output of the `pytest_command` live. If `True`, stderr is merged into stdout in the return value. Returns: The result of the ``pytest`` command. Raises: ``subprocess.CalledProcessError`` if the ``pytest`` command fails. """ args = [ 'source', '/opt/mesosphere/environment.export', '&&', 'cd', '/opt/mesosphere/active/dcos-integration-test/', '&&', ] env = env or {} def ip_addresses(nodes: Iterable[Node]) -> str: return ','.join(map(lambda node: str(node.ip_address), nodes)) environment_variables = { 'MASTER_HOSTS': ip_addresses(self.masters), 'SLAVE_HOSTS': ip_addresses(self.agents), 'PUBLIC_SLAVE_HOSTS': ip_addresses(self.public_agents), **env, } args += pytest_command # Tests are run on a random master node. test_host = next(iter(self.masters)) return test_host.run( args=args, user=self.default_ssh_user, log_output_live=log_output_live, env=environment_variables, )
def test_bootstrap(config_files, monkeypatch): with open(config_files, "r") as main_file: def fakegetuid(): return 0 monkeypatch.setattr(os, 'getuid', fakegetuid) def fakechown(*_): pass monkeypatch.setattr(shutil, 'chown', fakechown) def fakerun(*popenargs, **kwargs): if get_command(popenargs) == "chroot": rootfs_path = get_command_parameter(popenargs, "chroot") if not os.path.exists(rootfs_path): os.mkdir(rootfs_path) elif get_command(popenargs) == "debootstrap": rootfs_path = popenargs[0][-2] apt_dir = os.path.join(rootfs_path, 'etc', 'apt') os.makedirs(apt_dir) pass elif get_command(popenargs) == "tar": archive = get_command_parameter(popenargs, '-acf') with open(archive, mode="w") as fakearchive: fakearchive.write("fake archive") elif popenargs[0][-2] == "dpkg" and popenargs[0][-1] == "--print-architecture": return subprocess.CompletedProcess("fakerun", 0, 'amd64') elif get_command(popenargs) == "lxd" and get_sub_command(popenargs) == "--version": return subprocess.CompletedProcess("fakerun", 0, '2.18') elif get_command(popenargs) == "printenv": if get_sub_command(popenargs) == "HOME": return subprocess.CompletedProcess("fakerun", 0, '/no/such/directory') else: return subprocess.CompletedProcess("fakerun", 0, '') else: print('Passthrough: {}'.format(get_command(popenargs))) return subprocess.run(*popenargs, **kwargs) return subprocess.CompletedProcess("fakerun", 0, '') monkeypatch.setattr(mockablerun, 'run_mockable', fakerun) monkeypatch.chdir(os.path.dirname(config_files)) bootstrap_cmd = Bootstrap() with requests_mock.Mocker() as m: m.get('https://ftp-master.debian.org/keys/archive-key-8.asc', text='key file mockup') bootstrap_cmd.run(main_file) expected_result = bootstrap_cmd._result() assert os.path.exists(expected_result) previous_result_text = "previous result" with open(expected_result, mode="w") as previous_result: previous_result.write(previous_result_text) bootstrap_cmd2 = Bootstrap() bootstrap_cmd2.run(main_file) with open(expected_result, mode="r") as same_result: assert same_result.read() == previous_result_text
def test_plugins(monkeypatch, config_files, capsys, command, command_args, has_templates, has_profiles, has_playbooks, has_postprocessing_commands): def fake_lxc_config_command(*popenargs, **kwargs): if 'images.compression_algorithm' in popenargs[0]: return subprocess.CompletedProcess("fakerun", 0, '') else: return subprocess.run(*popenargs, **kwargs) monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_config_command) parser = edi._setup_command_line_interface() command_args.append(config_files) cli_args = parser.parse_args(command_args) command().run_cli(cli_args) out, err = capsys.readouterr() assert err == '' result = yaml.load(out) if has_templates: assert result.get('lxc_templates') else: assert not result.get('lxc_templates') if has_profiles: assert result.get('lxc_profiles') else: assert not result.get('lxc_profiles') if has_playbooks: assert len(result.get('playbooks')) == 3 base_system = result.get('playbooks')[0].get('10_base_system') assert 'plugins/playbooks/foo.yml' in base_system.get('path') assert base_system.get('dictionary').get('kernel_package') == 'linux-image-amd64-rt' assert base_system.get('dictionary').get('edi_config_directory') == os.path.dirname(config_files) else: assert not result.get('playbooks') if has_postprocessing_commands: assert result.get('postprocessing_commands') else: assert not result.get('postprocessing_commands')