Python subprocess 模块,SubprocessError() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用subprocess.SubprocessError()

项目:knit    作者:dask    | 项目源码 | 文件源码
def kill(self, app_id):
        """
        Method to kill a yarn application

        Parameters
        ----------
        app_id: str
            YARN application id

        Returns
        -------
        bool:
            True if successful, False otherwise.
        """

        cmd = ["yarn", "application", "-kill", app_id]
        try:
            out = shell_out(cmd, stderr=STDOUT)
            return "Killed application" in out
        except SubprocessError:
            return False
项目:camisole    作者:prologin    | 项目源码 | 文件源码
def find_class_having_main(self, classes):
        for file in classes:
            # run javap(1) with type signatures
            try:
                stdout = subprocess.check_output(
                    [self.extra_binaries['disassembler'].cmd, '-s', str(file)],
                    stderr=subprocess.DEVNULL, env=self.compiler.env)
            except subprocess.SubprocessError:  # noqa
                continue
            # iterate on lines to find p s v main() signature and then
            # its descriptor on the line below; we don't rely on the type
            # from the signature, because it could be String[], String... or
            # some other syntax I'm not even aware of
            lines = iter(stdout.decode().split('\n'))
            for line in lines:
                line = line.lstrip()
                if line.startswith('public static') and 'void main(' in line:
                    if next(lines).lstrip() == PSVMAIN_DESCRIPTOR:
                        return file.stem
项目:mobile-monkey    作者:LordAmit    | 项目源码 | 文件源码
def adb_start_server_safe():
    '''
    checks if `adb server` is running. if not, starts it.
    '''
    try:
        status = subprocess.check_output(['pidof', ADB])
        util.debug_print('adb already running in PID: ' +
                         status.decode(), flag=PRINT_FLAG)
        return True
    except subprocess.CalledProcessError as exception:
        print('adb is not running, returned status: ' +
              str(exception.returncode))

        print('adb was not started. starting...')

        try:
            subprocess.check_output([ADB, 'start-server'])

            return True
        except subprocess.SubprocessError as exception:
            print(
                'something disastrous happened. maybe ' + ADB + ' was not found')
            return False
项目:knesset-data-pipelines    作者:hasadna    | 项目源码 | 文件源码
def _parse_rtf_protocol(self, committee_id, meeting_id, bucket, protocol_object_name, parts_object_name, text_object_name):
        # currently with the new API - we don't seem to get rtf files anymore
        # it looks like files which used to be rtf are actually doc
        # need to investigate further
        return False
        # rtf_extractor = os.environ.get("RTF_EXTRACTOR_BIN")
        # if rtf_extractor:
        #     with object_storage.temp_download(protocol_object_name) as protocol_filename:
        #         with tempfile.NamedTemporaryFile() as text_filename:
        #             cmd = rtf_extractor + ' ' + protocol_filename + ' ' + text_filename
        #             try:
        #                 subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
        #                 protocol_text = fs.read(text_filename)
        #                 with CommitteeMeetingProtocol.get_from_text(protocol_text) as protocol:
        #                     self._parse_protocol_parts(parts_filename, protocol)
        #             except subprocess.SubprocessError:
        #                 logging.exception("committee {} meeting {}: failed to parse rtf file, skipping".format(committee_id,
        #                                                                                                        meeting_id))
        #                 return False
        #             return True
        # else:
        #     logging.warning("missing RTF_EXTRACTOR_BIN environment variable, skipping rtf parsing")
        #     return False
项目:ddmbot    作者:Budovi    | 项目源码 | 文件源码
def _spawn_ffmpeg(self):
        if self.streaming:
            url = self._stream_url
        elif self.playing:
            url = self._song_context.song_url
        else:
            raise RuntimeError('Player is in an invalid state')

        args = shlex.split(self._ffmpeg_command.format(shlex.quote(url)))
        try:
            self._ffmpeg = subprocess.Popen(args)
        except FileNotFoundError as e:
            raise RuntimeError('ffmpeg executable was not found') from e
        except subprocess.SubprocessError as e:
            raise RuntimeError('Popen failed: {0.__name__} {1}'.format(type(e), str(e))) from e

    #
    # Player FSM
    #
项目:tppocr    作者:chfoo    | 项目源码 | 文件源码
def _terminate_ffmpeg(self):
        proc = self._current_proc

        if not proc:
            return

        _logger.info('Terminating ffmpeg...')
        try:
            proc.terminate()
            proc.wait(5)
        except (OSError, subprocess.SubprocessError):
            _logger.exception('Terminate ffmpeg')

        try:
            proc.kill()
        except (OSError, subprocess.SubprocessError):
            pass

        self._current_proc = None
项目:odl-video-service    作者:mitodl    | 项目源码 | 文件源码
def verify_s3_bucket_exists(s3_bucket_name):
    """
    Check whether S3 bucket exists

    Args:
      s3_bucket_name (str): The s3 bucket name

    Returns:
      list: if connection established and bucket found, return list of
        objects in bucket otherwise error and exit on any issues trying
        to list objects in bucket.
    """
    ls_s3_bucket_cmd = 'aws s3api head-bucket --bucket {}'.format(s3_bucket_name)
    try:
        subprocess.run(ls_s3_bucket_cmd, check=True,
                       stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except subprocess.SubprocessError:
        logger.exception("Failed to list specified s3 bucket: {}", s3_bucket_name)
        sys.exit("[-] Failed to list specified s3 bucket")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_surrogates_error_message(self):
        def prepare():
            raise ValueError("surrogate:\uDCff")

        try:
            subprocess.call(
                [sys.executable, "-c", "pass"],
                preexec_fn=prepare)
        except ValueError as err:
            # Pure Python implementations keeps the message
            self.assertIsNone(subprocess._posixsubprocess)
            self.assertEqual(str(err), "surrogate:\uDCff")
        except subprocess.SubprocessError as err:
            # _posixsubprocess uses a default message
            self.assertIsNotNone(subprocess._posixsubprocess)
            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
        else:
            self.fail("Expected ValueError or subprocess.SubprocessError")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_surrogates_error_message(self):
        def prepare():
            raise ValueError("surrogate:\uDCff")

        try:
            subprocess.call(
                [sys.executable, "-c", "pass"],
                preexec_fn=prepare)
        except ValueError as err:
            # Pure Python implementations keeps the message
            self.assertIsNone(subprocess._posixsubprocess)
            self.assertEqual(str(err), "surrogate:\uDCff")
        except subprocess.SubprocessError as err:
            # _posixsubprocess uses a default message
            self.assertIsNotNone(subprocess._posixsubprocess)
            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
        else:
            self.fail("Expected ValueError or subprocess.SubprocessError")
项目:mbot    作者:michaelkuty    | 项目源码 | 文件源码
def install_package(package, upgrade=True,
                    target=None):
    """Install a package on PyPi. Accepts pip compatible package strings.
    Return boolean if install successful.
    """
    # Not using 'import pip; pip.main([])' because it breaks the logger
    with INSTALL_LOCK:
        if check_package_exists(package, target):
            return True

        _LOGGER.info('Attempting install of %s', package)
        args = [sys.executable, '-m', 'pip', 'install', '--quiet', package]
        if upgrade:
            args.append('--upgrade')
        if target:
            args += ['--target', os.path.abspath(target)]

        try:
            return subprocess.call(args) == 0
        except subprocess.SubprocessError:
            _LOGGER.exception('Unable to install pacakge %s', package)
            return False
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def install_package(package: str, upgrade: bool=True,
                    target: Optional[str]=None) -> bool:
    """Install a package on PyPi. Accepts pip compatible package strings.

    Return boolean if install successful.
    """
    # Not using 'import pip; pip.main([])' because it breaks the logger
    with INSTALL_LOCK:
        if check_package_exists(package, target):
            return True

        _LOGGER.info('Attempting install of %s', package)
        args = [sys.executable, '-m', 'pip', 'install', '--quiet', package]
        if upgrade:
            args.append('--upgrade')
        if target:
            args += ['--target', os.path.abspath(target)]

        try:
            return subprocess.call(args) == 0
        except subprocess.SubprocessError:
            _LOGGER.exception('Unable to install pacakge %s', package)
            return False
项目:IgDiscover    作者:NBISweden    | 项目源码 | 文件源码
def makeblastdb(fasta, database_name):
    with SequenceReader(fasta) as fr:
        sequences = list(fr)
    if not sequences:
        raise ValueError("FASTA file {} is empty".format(fasta))
    process_output = subprocess.check_output(
        ['makeblastdb', '-parse_seqids', '-dbtype', 'nucl', '-in', fasta, '-out', database_name],
        stderr=subprocess.STDOUT
    )
    if b'Error: ' in process_output:
        raise subprocess.SubprocessError()
项目:core-workflow    作者:python    | 项目源码 | 文件源码
def is_cpython_repo():
    cmd = "git log -r 7f777ed95a19224294949e1b4ce56bbffcb1fe9f"
    try:
        subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
    except subprocess.SubprocessError:
        return False
    return True
项目:mobile-monkey    作者:LordAmit    | 项目源码 | 文件源码
def adb_install_apk(emulator: Emulator, apk: Apk):
    '''
    installs provided apk to specified emulator
    '''
    util.check_file_directory_exists(apk.apk_path, True)
    try:
        result = subprocess.check_output(
            [config.adb, '-s', 'emulator-' + emulator.port, 'install', apk.apk_path]).decode()
        util.debug_print(result, flag=PRINT_FLAG)
    except subprocess.SubprocessError as error:
        print(error)
        raise ValueError("error installing.")
项目:mobile-monkey    作者:LordAmit    | 项目源码 | 文件源码
def adb_uninstall_package(emulator: Emulator, package: str):
    '''
    uninstalls the provided package if only one entry with the specified package is found.
    '''
    # if adb_is_package_present(emulator, package) is not 1:
    #     raise ValueError("Package not found / Too generic.")
    try:
        result = subprocess.check_output(
            [config.adb, '-s', 'emulator-' + emulator.port, 'uninstall', package])
        print("uninstalled " + package)
    except subprocess.SubprocessError as error:
        print("maybe not found/uninstalled already")
项目:mobile-monkey    作者:LordAmit    | 项目源码 | 文件源码
def adb_pidof_app(emulator: Emulator, apk: Apk):
    '''
    returns PID of running apk
    '''
    try:
        result = subprocess.check_output(
            [config.adb, '-s', 'emulator-' + emulator.port, 'shell', 'pidof', apk.package_name])
        result = result.decode().split('\n')[0]
        util.debug_print(result, flag=PRINT_FLAG)
        return result
    except subprocess.SubprocessError:
        print("maybe not found/uninstalled already")
项目:mobile-monkey    作者:LordAmit    | 项目源码 | 文件源码
def reset_all(self):
        '''
            defaults to settings.
            accelerometer_rotation on
            airplane_mode_off
            user_rotation POTRAIT
        '''
        self.set_accelerometer_rotation(False)
        self.set_user_rotation(UserRotation.ROTATION_POTRAIT)
        self.set_accelerometer_rotation(True)

    # def adb_start_server_safe(self):
    #     '''
    #     checks if `adb server` is running. if not, starts it.
    #     '''
    #     try:
    #         status = subprocess.check_output(['pidof', ADB])
    #         print('adb already running in PID: ' + status.decode())
    #         return True
    #     except subprocess.CalledProcessError as exception:
    #         print('adb is not running, returned status: ' +
    #               str(exception.returncode))

    #         print('adb was not started. starting...')

    #         try:
    #             subprocess.check_output([ADB, 'start-server'])
    #             return True
    #         except subprocess.SubprocessError as exception:
    #             print('something disastrous happened. maybe ' +
    #                   ADB + ' was not found')
    #             return False
项目:Squid-Plugins    作者:tekulvw    | 项目源码 | 文件源码
def create_ffmpeg_player(self, filename, *, use_avconv=False, pipe=False,
                             options=None, before_options=None, headers=None,
                             after=None):
        """
        Stolen from Rapptz/Danny, thanks!
        """
        command = 'ffmpeg' if not use_avconv else 'avconv'
        input_name = '-' if pipe else shlex.quote(filename)
        before_args = ""
        if isinstance(headers, dict):
            for key, value in headers.items():
                before_args += "{}: {}\r\n".format(key, value)
            before_args = ' -headers ' + shlex.quote(before_args)

        if isinstance(before_options, str):
            before_args += ' ' + before_options

        cmd = command + '{} -i {} -f s16le -ar {} -ac {} -loglevel warning'
        cmd = cmd.format(before_args, input_name, self.encoder.sampling_rate,
                         self.encoder.channels)

        if isinstance(options, str):
            cmd = cmd + ' ' + options

        cmd += ' pipe:1'

        stdin = None if not pipe else filename
        args = shlex.split(cmd)
        try:
            p = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE)
            return ProcessPlayer(p, self, after)
        except FileNotFoundError as e:
            raise ClientException('ffmpeg/avconv was not found in your PATH'
                                  ' environment variable') from e
        except subprocess.SubprocessError as e:
            raise ClientException(
                'Popen failed: {0.__name__} {1}'.format(type(e), str(e))) \
                from e
项目:danube-delta    作者:honzajavorek    | 项目源码 | 文件源码
def lint(context):
    """Looks for errors in source code of your blog"""

    config = context.obj
    try:
        run('flake8 {dir} --exclude={exclude}'.format(
            dir=config['CWD'],
            exclude=','.join(EXCLUDE),
        ))
    except SubprocessError:
        context.exit(1)
项目:knesset-data-pipelines    作者:hasadna    | 项目源码 | 文件源码
def _parse_doc_protocol(self, committee_id, meeting_id, bucket, protocol_object_name, parts_object_name, text_object_name):
        logging.info("parsing doc protocol {} --> {}, {}".format(protocol_object_name, parts_object_name, text_object_name))
        with object_storage.temp_download(self.s3, bucket, protocol_object_name) as protocol_filename:
            try:
                with CommitteeMeetingProtocol.get_from_filename(protocol_filename) as protocol:
                    object_storage.write(self.s3, bucket, text_object_name, protocol.text, public_bucket=True)
                    self._parse_protocol_parts(bucket, parts_object_name, protocol)
            except (
                    AntiwordException,  # see https://github.com/hasadna/knesset-data-pipelines/issues/15
                    subprocess.SubprocessError,
                    xml.etree.ElementTree.ParseError  # see https://github.com/hasadna/knesset-data-pipelines/issues/32
            ):
                logging.exception("committee {} meeting {}: failed to parse doc file, skipping".format(committee_id, meeting_id))
                return False
        return True
项目:pycompatlayer    作者:ale5000-git    | 项目源码 | 文件源码
def _subprocess_called_process_error(already_exist, subprocess_lib):
    if already_exist:
        class ExtCalledProcessError(subprocess_lib.CalledProcessError):
            """Raised when a process run by check_call() or check_output()
            returns a non-zero exit status."""

            def __init__(self, returncode, cmd, output=None, stderr=None):
                try:
                    super(ExtCalledProcessError, self).__init__(returncode=returncode,
                                                                cmd=cmd, output=output, stderr=stderr)
                except TypeError:
                    try:
                        super(ExtCalledProcessError, self).__init__(returncode=returncode,
                                                                    cmd=cmd, output=output)
                    except TypeError:
                        super(ExtCalledProcessError, self).__init__(returncode=returncode,
                                                                    cmd=cmd)
                        self.output = output
                    self.stdout = output
                    self.stderr = stderr

        _InternalReferences.UsedCalledProcessError = ExtCalledProcessError
    else:
        class CalledProcessError(subprocess_lib.SubprocessError):
            """Raised when a process run by check_call() or check_output()
            returns a non-zero exit status."""

            def __init__(self, returncode, cmd, output=None, stderr=None):
                subprocess_lib.SubprocessError.__init__(self, "Command '" + str(cmd) + "' returned non-zero exit status " + str(returncode))
                self.returncode = returncode
                self.cmd = cmd
                self.output = output
                self.stdout = output
                self.stderr = stderr

        _InternalReferences.UsedCalledProcessError = CalledProcessError


# API
项目:pycompatlayer    作者:ale5000-git    | 项目源码 | 文件源码
def fix_subprocess(override_debug=False, override_exception=False):
    """Activate the subprocess compatibility."""
    import subprocess

    # Exceptions
    if subprocess.__dict__.get("SubprocessError") is None:
        subprocess.SubprocessError = _Internal.SubprocessError
    if _InternalReferences.UsedCalledProcessError is None:
        if "CalledProcessError" in subprocess.__dict__:
            _subprocess_called_process_error(True, subprocess)
        else:
            _subprocess_called_process_error(False, subprocess)
            subprocess.CalledProcessError = _InternalReferences.UsedCalledProcessError

    def _check_output(*args, **kwargs):
        if "stdout" in kwargs:
            raise ValueError("stdout argument not allowed, "
                             "it will be overridden.")
        process = subprocess.Popen(stdout=subprocess.PIPE, *args, **kwargs)
        stdout_data, __ = process.communicate()
        ret_code = process.poll()
        if ret_code is None:
            raise RuntimeWarning("The process is not yet terminated.")
        if ret_code:
            cmd = kwargs.get("args")
            if cmd is None:
                cmd = args[0]
            raise _InternalReferences.UsedCalledProcessError(returncode=ret_code, cmd=cmd, output=stdout_data)
        return stdout_data

    try:
        subprocess.check_output
    except AttributeError:
        subprocess.check_output = _check_output
项目:iotracer    作者:b-com    | 项目源码 | 文件源码
def __init__(self, filename, max_events=0):
        if not os.path.exists("/proc/iotracer"):
            raise AssertionError('iotracer kernel module is not loaded')

        self._filename = filename
        cmd = 'readlink -e $(df --output=source ' + filename + ' | tail -1)'
        cmd += ' | cut -d/ -f3'
        try:
            out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL,
                                          universal_newlines=True,
                                          shell=True)
        except subprocess.SubprocessError:
            print('fail to get block device for %s' % filename)
            raise
        else:
            bdev = out.rstrip(b'\n')
            try:
                ino = os.stat(filename).st_ino
                with open('/proc/iotracer/control', 'w') as fctl:
                    cmd = 'add %s' % self._filename
                    if max_events:
                        cmd += ' %s' % max_events
                    print(cmd, file=fctl)
            except OSError:
                print('fail to add %s to iotracer monitoring' % filename)
                raise
            else:
                self._procdir = '/proc/iotracer/%s_%s' % (bdev, ino)
                IoTracerLog.__init__(self, self._procdir + '/log')
项目:odl-video-service    作者:mitodl    | 项目源码 | 文件源码
def sync_local_to_s3(local_video_records_done_folder,
                     s3_bucket_name,
                     s3_sync_result_file):
    """
    Sync local files to specified S3 bucket

    Args:
      local_video_records_done_folder (str): local folder containing video
        files ready to be copied to S3.
      s3_bucket_name (str): s3 bucket name
    """
    s3_sync_cmd = 'aws s3 sync {} s3://{} > "{}"'.format(local_video_records_done_folder,
                                                         s3_bucket_name,
                                                         s3_sync_result_file)
    try:
        cmd_output = subprocess.run(s3_sync_cmd,
                                    check=True,
                                    shell=True,
                                    stdout=subprocess.PIPE,
                                    stderr=subprocess.PIPE)
    except subprocess.SubprocessError as err:
        logger.exception("Failed to sync local files to s3 bucket")
        notify_slack_channel(f"Sync failed: *{computer_name}* \n `{err}`")
        sys.exit("[-] Failed to sync local files to s3 bucket")
    logger.info("S3 sync successfully ran: {}", cmd_output)
    notify_slack_channel(f"Sync succeeded on: *{computer_name}* \n `str({cmd_output})`")
    logger.info("Syncing complete")
项目:gitlab-merge-request    作者:anti-social    | 项目源码 | 文件源码
def git_cmd(self, args):
        git_args = ['git'] + args
        try:
            res = subprocess.run(
                git_args, stdout=subprocess.PIPE,
            )
            if res.returncode != 0:
                err('%s command exited with error: %s',
                    ' '.join(git_args), res.returncode)
            return str(res.stdout, 'utf-8').strip()
        except FileNotFoundError as e:
            err('Cannot find git command: %s', e)
        except subprocess.SubprocessError as e:
            err('Error running git command: %s', e)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_preexec_exception(self):
        def raise_it():
            raise ValueError("What if two swallows carried a coconut?")
        try:
            p = subprocess.Popen([sys.executable, "-c", ""],
                                 preexec_fn=raise_it)
        except subprocess.SubprocessError as e:
            self.assertTrue(
                    subprocess._posixsubprocess,
                    "Expected a ValueError from the preexec_fn")
        except ValueError as e:
            self.assertIn("coconut", e.args[0])
        else:
            self.fail("Exception raised by preexec_fn did not make it "
                      "to the parent process.")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_preexec_errpipe_does_not_double_close_pipes(self):
        """Issue16140: Don't double close pipes on preexec error."""

        def raise_it():
            raise subprocess.SubprocessError(
                    "force the _execute_child() errpipe_data path.")

        with self.assertRaises(subprocess.SubprocessError):
            self._TestExecuteChildPopen(
                        self, [sys.executable, "-c", "pass"],
                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE, preexec_fn=raise_it)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_preexec_exception(self):
        def raise_it():
            raise ValueError("What if two swallows carried a coconut?")
        try:
            p = subprocess.Popen([sys.executable, "-c", ""],
                                 preexec_fn=raise_it)
        except subprocess.SubprocessError as e:
            self.assertTrue(
                    subprocess._posixsubprocess,
                    "Expected a ValueError from the preexec_fn")
        except ValueError as e:
            self.assertIn("coconut", e.args[0])
        else:
            self.fail("Exception raised by preexec_fn did not make it "
                      "to the parent process.")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_preexec_errpipe_does_not_double_close_pipes(self):
        """Issue16140: Don't double close pipes on preexec error."""

        def raise_it():
            raise subprocess.SubprocessError(
                    "force the _execute_child() errpipe_data path.")

        with self.assertRaises(subprocess.SubprocessError):
            self._TestExecuteChildPopen(
                        self, [sys.executable, "-c", "pass"],
                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE, preexec_fn=raise_it)
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def test_install_error(self, mock_sys, mock_logger, mock_exists,
                           mock_subprocess):
        """Test an install with a target."""
        mock_exists.return_value = False
        mock_subprocess.side_effect = [subprocess.SubprocessError]

        self.assertFalse(package.install_package(TEST_NEW_REQ))

        self.assertEqual(mock_logger.exception.call_count, 1)
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def send_message(self, message="", **kwargs):
        """Send a message to a command line."""
        try:
            proc = subprocess.Popen(self.command, universal_newlines=True,
                                    stdin=subprocess.PIPE, shell=True)
            proc.communicate(input=message)
            if proc.returncode != 0:
                _LOGGER.error('Command failed: %s', self.command)
        except subprocess.SubprocessError:
            _LOGGER.error('Error trying to exec Command: %s', self.command)
项目:build    作者:freenas    | 项目源码 | 文件源码
def run(self):
        for s in self.test_suites:
            script = os.path.join(s, 'run.py')
            start_time = time.time()
            manifest = self.load_manifest(s)
            os.chdir(s)

            info("Running tests from {0}".format(s))

            args = [e('${venvdir}/bin/python'), script]
            test = None
            try:
                test = subprocess.Popen(
                    args,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    close_fds=True
                )
                test.wait(timeout=manifest['timeout'])
            except subprocess.TimeoutExpired as err:
                self.generate_suite_error(
                    os.path.join(s, 'results.xml'),
                    manifest['name'],
                    time.time() - start_time,
                    'Test timeout reached',
                    err
                )
            except subprocess.SubprocessError as err:
                self.generate_suite_error(
                    os.path.join(s, 'results.xml'),
                    manifest['name'],
                    time.time() - start_time,
                    'Test could not be started',
                    err
                )

            out, err = test.communicate()

            if test and test.returncode:
                self.generate_suite_error(
                    os.path.join(s, 'results.xml'),
                    manifest['name'],
                    time.time() - start_time,
                    'Test process has returned an error',
                    out
                )

            info("{0} error:".format(script))
            print(out.decode('utf-8'))
项目:homeassistant    作者:NAStools    | 项目源码 | 文件源码
def setup(hass, config):
    """Setup the shell_command component."""
    conf = config.get(DOMAIN, {})

    cache = {}

    def service_handler(call):
        """Execute a shell command service."""
        cmd = conf[call.service]

        if cmd in cache:
            prog, args, args_compiled = cache[cmd]
        elif ' ' not in cmd:
            prog = cmd
            args = None
            args_compiled = None
            cache[cmd] = prog, args, args_compiled
        else:
            prog, args = cmd.split(' ', 1)
            args_compiled = template.Template(args, hass)
            cache[cmd] = prog, args, args_compiled

        if args_compiled:
            try:
                rendered_args = args_compiled.render(call.data)
            except TemplateError as ex:
                _LOGGER.exception('Error rendering command template: %s', ex)
                return
        else:
            rendered_args = None

        if rendered_args == args:
            # no template used. default behavior
            shell = True
        else:
            # template used. Break into list and use shell=False for security
            cmd = [prog] + shlex.split(rendered_args)
            shell = False

        try:
            subprocess.call(cmd, shell=shell,
                            stdout=subprocess.DEVNULL,
                            stderr=subprocess.DEVNULL)
        except subprocess.SubprocessError:
            _LOGGER.exception('Error running command: %s', cmd)

    for name in conf.keys():
        hass.services.register(DOMAIN, name, service_handler)
    return True