Python subprocess 模块,DEVNULL 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.DEVNULL

项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def handle_file(self, fname):
        with open(fname, 'rb') as fd:
            if fd.read(4) == b'dex\n':
                new_jar = self.name + '/classes-dex2jar.jar'
                run([dex2jar, fname, '-f', '-o', new_jar], cwd=self.name, stderr=DEVNULL)
                fname = new_jar

        with ZipFile(fname) as jar:
            jar.extractall(self.name)

            for cls in jar.namelist():
                if cls.endswith('.class'):
                    cls = cls.replace('/', '.')[:-6]
                    self.classes.append(cls)

                elif cls.endswith('.dex'):
                    self.handle_file(self.name + '/' + cls)

                elif cls.endswith('.proto'):
                    self.bonus_protos[cls] = jar.read(cls).decode('utf8')

                elif cls.endswith('.so'):
                    self.bonus_protos.update(walk_binary(self.name + '/' + cls))
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def run_cmake(self):
        print("Running CMake")
        build_dir_cmd_out = subprocess.call(
            ["mkdir", "build"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL)
        if build_dir_cmd_out != 0:
            print("Can\'t setup CMake build directory.")
            return

        if self.cmake_build_info["build_dir"].is_dir():
            try:
                subprocess.check_output(
                    self.cmake_cmd_info["cmake_cmd"],
                    cwd=str(self.cmake_build_info["build_dir"]))
            except subprocess.CalledProcessError as e:
                print(e.output)
            if not self.cmake_build_info["comp_data_cmake"].is_file():
                print("Couldn't setup CMake Project")
                return
        else:
            print("Couldn't setup CMake Project")
            return
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def _openDownloadFile(self, buildId, suffix):
        (tmpFd, tmpName) = mkstemp()
        url = self._makeUrl(buildId, suffix)
        try:
            os.close(tmpFd)
            env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList }
            env["BOB_LOCAL_ARTIFACT"] = tmpName
            env["BOB_REMOTE_ARTIFACT"] = url
            ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd],
                stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
                cwd="/tmp", env=env)
            if ret == 0:
                ret = tmpName
                tmpName = None
                return CustomDownloader(ret)
            else:
                raise ArtifactDownloadError("failed (exit {})".format(ret))
        finally:
            if tmpName is not None: os.unlink(tmpName)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def callGit(self, workspacePath, *args):
        cmdLine = ['git']
        cmdLine.extend(args)
        try:
            output = subprocess.check_output(cmdLine, cwd=os.path.join(os.getcwd(), workspacePath, self.__dir),
                universal_newlines=True, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            raise BuildError("git error:\n Directory: '{}'\n Command: '{}'\n'{}'".format(
                os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip()))
        return output

    # Get GitSCM status. The purpose of this function is to return the status of the given directory
    #
    # return values:
    #  - error: The SCM is in a error state. Use this if git returned a error code.
    #  - dirty: SCM is dirty. Could be: modified files, switched to another branch/tag/commit/repo, unpushed commits.
    #  - clean: Same branch/tag/commit as specified in the recipe and no local changes.
    #  - empty: Directory is not existing.
    #
    # This function is called when build with --clean-checkout. 'error' and 'dirty' SCMs are moved to attic,
    # while empty and clean directories are not.
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def callSubversion(self, workspacePath, *args):
        cmdLine = ['svn']
        cmdLine.extend(args)

        try:
            output = subprocess.check_output(cmdLine, cwd=workspacePath,
                universal_newlines=True, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            raise BuildError("svn error:\n Directory: '{}'\n Command: '{}'\n'{}'".format(
                os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip()))
        return output

    # Get SvnSCM status. The purpose of this function is to return the status of the given directory
    #
    # return values:
    #  - error: the scm is in a error state. Use this if svn call returns a error code.
    #  - dirty: SCM is dirty. Could be: modified files, switched to another URL or revision
    #  - clean: same URL and revision as specified in the recipe and no local changes.
    #  - empty: directory is not existing
    #
    # This function is called when build with --clean-checkout. 'error' and 'dirty' scm's are moved to attic,
    # while empty and clean directories are not.
项目:runcommands    作者:wylee    | 项目源码 | 文件源码
def version_getter(config):
    """Get tag associated with HEAD; fall back to SHA1.

    If HEAD is tagged, return the tag name; otherwise fall back to
    HEAD's short SHA1 hash.

    .. note:: Only annotated tags are considered.

    TODO: Support non-annotated tags?

    """
    try:
        check_output(['git', 'rev-parse', '--is-inside-work-tree'], stderr=DEVNULL)
    except CalledProcessError:
        return None
    encoding = getpreferredencoding(do_setlocale=False)
    try:
        version = check_output(['git', 'describe', '--exact-match'], stderr=DEVNULL)
    except CalledProcessError:
        version = check_output(['git', 'rev-parse', '--short', 'HEAD'])
    version = version.decode(encoding).strip()
    return version
项目:punch    作者:lgiordani    | 项目源码 | 文件源码
def _check_system(self):
        null_commands = self.commands + ["--help"]

        if six.PY2:
            not_found_exception = IOError
        else:
            not_found_exception = FileNotFoundError

        try:
            if six.PY2:
                subprocess.check_output(null_commands)
            else:
                subprocess.check_call(null_commands, stdout=subprocess.DEVNULL)

        except not_found_exception:
            raise RepositorySystemError("Cannot run {}".format(self.command))
        except subprocess.CalledProcessError:
            raise RepositorySystemError(
                "Error running {}".format(self.command))
项目:paperless    作者:lrnt    | 项目源码 | 文件源码
def _convert(self, command_path, input_path, settings=[],
                 prefix='', fileformat='png'):
        if prefix:
            prefix = '{}-'.format(prefix)

        output_path = path.join(
            self.dest_pages,
            '{}%04d.{}'.format(prefix, fileformat)
        )
        exit_code = subprocess.Popen(
            (command_path, *settings, input_path, output_path),
            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
        ).wait()

        if exit_code != 0:
            raise ProcessingError(
                'Was unable to convert document to {}s ({}).'.format(fileformat,
                                                                     exit_code)
            )

        return glob.glob(path.join(
            self.dest_pages, '{}[0-9][0-9][0-9][0-9].{}'.format(prefix,
                                                                fileformat)
        ))
项目:integration-prototype    作者:SKA-ScienceDataProcessor    | 项目源码 | 文件源码
def run_vis(self, config):
        """."""
        self.poll_master_stdout(5, 'Waiting to receive',
                                'run_vis initialisation')

        pid = subprocess.Popen(['/usr/bin/python', '-m',
                                'sip.emulators.csp_visibility_sender',
                                config],
                               stdin=subprocess.DEVNULL,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE)
        pid.wait()
        output = pid.communicate()
        if DEBUG:
            print('--------------------------')
            print('nvis_sender, stdout:')
            print('--------------------------')
            print(output[0].decode("utf-8"))
            print('--------------------------')
            print('nvis_sender, stderr:')
            print('--------------------------')
            print(output[1].decode("utf-8"))

        self.poll_master_stdout(5, 'Received heap 9', 'run_vis termination')
项目:solace-messaging-cf-dev    作者:SolaceLabs    | 项目源码 | 文件源码
def getDeployedIps(deploymentName):
    deploymentCount = subprocess.check_output("bosh -e lite deployments | grep {} | wc -l".format(deploymentName),
        shell=True, stderr=subprocess.DEVNULL)

    if int(deploymentCount) == 0:
        return None

    deployedManifest = yaml.load(
            subprocess.check_output("bosh -e lite -d {} manifest".format(deploymentName),
            shell=True, stderr=subprocess.DEVNULL))

    deployedIpConfig = {}
    deployedIpConfig["global"] = getTestSubnet(deployedManifest)["static"]
    deployedJobs = [j for j in deployedManifest["jobs"] if j["name"] in commonUtils.POOL_TYPES]

    for job in deployedJobs:
        jobIps = job["networks"][0]["static_ips"]
        deployedIpConfig[job["name"]] = jobIps

    return deployedIpConfig
项目:camisole    作者:prologin    | 项目源码 | 文件源码
def find_class_having_main(self, classes):
        for file in classes:
            # run javap(1) with type signatures
            try:
                stdout = subprocess.check_output(
                    [self.extra_binaries['disassembler'].cmd, '-s', str(file)],
                    stderr=subprocess.DEVNULL, env=self.compiler.env)
            except subprocess.SubprocessError:  # noqa
                continue
            # iterate on lines to find p s v main() signature and then
            # its descriptor on the line below; we don't rely on the type
            # from the signature, because it could be String[], String... or
            # some other syntax I'm not even aware of
            lines = iter(stdout.decode().split('\n'))
            for line in lines:
                line = line.lstrip()
                if line.startswith('public static') and 'void main(' in line:
                    if next(lines).lstrip() == PSVMAIN_DESCRIPTOR:
                        return file.stem
项目:snappy    作者:ricci    | 项目源码 | 文件源码
def runTarsnap(args, timeout = None):
    command = [config.tarsnap_bin] + config.tarsnap_extra_args + args
    proc = subprocess.Popen(command,
                stdout = subprocess.PIPE, stderr = subprocess.PIPE,
                stdin = subprocess.DEVNULL, universal_newlines = True)
    result = CheapoCompletedProcess()
    try:
        result.stdout, result.stderr = proc.communicate(timeout = timeout)
        result.returncode = proc.wait()
        if result.returncode:
            sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr))
        return result
    except subprocess.TimeoutExpired:
        print("Tarsnap timed out, sending SIGQUIT...")
        proc.send_signal(signal.SIGQUIT)
        result.stdout, result.stderr = proc.communicate()
        result.returncode = proc.wait()
        print("Tarsnap finished")
        if result.returncode:
            sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr))
        return result
项目:skastic    作者:mypalmike    | 项目源码 | 文件源码
def parse_text(self, img):
    filename = "obj_{}.png".format(id(self))
    self.save_bounding_box(img, filename)
    cmd = [
        'tesseract',
        filename,
        'stdout',
        '--psm',
        '7',
        '-l',
        'eng+equ',
        '-c',
        'tessedit_char_whitelist=abcdefghijklmnopqrstuvwxyz0123456789=+-*/'
    ]
    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
    self.text = result.stdout.decode('ascii').strip().lower() or ''
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def running_old_sell(manager, installer):
    if any(installed is False for package, installed in installer.check_dependencies()):
        return False

    if isinstance(manager.machine, Two1MachineVirtual):
        try:
            vbox_conf = subprocess.check_output(["VBoxManage", "showvminfo", "21", "--machinereadable"],
                                                stderr=subprocess.DEVNULL)
            if type(vbox_conf) is bytes:
                vbox_conf = vbox_conf.decode()
            if "bridgeadapter3" in vbox_conf:
                return True
            else:
                return False
        except subprocess.CalledProcessError:
            return False
    else:
        return False
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def _start_sell_service(self, service_name, failed_to_start_hook, started_hook, failed_to_up_hook, up_hook,
                            timeout=Two1Composer.SERVICE_START_TIMEOUT):
        try:
            subprocess.check_output(["docker-compose", "-f", Two1Composer.COMPOSE_FILE, "up", "-d", service_name],
                                    stderr=subprocess.DEVNULL, env=self.machine_env)
        except subprocess.CalledProcessError:
            failed_to_start_hook(service_name)
        else:
            started_hook(service_name)
            if service_name == 'router':
                time.sleep(5)
            elif service_name != 'router' and service_name != 'base':
                start = time.clock()

                exec_id = self.docker_client.exec_create('sell_router', "curl %s:5000" % service_name)['Id']
                self.docker_client.exec_start(exec_id)
                running = True

                while time.clock() - start < timeout and running is True:
                    running = self.docker_client.exec_inspect(exec_id)['Running']

                if running is True:
                    failed_to_up_hook(service_name)
                else:
                    up_hook(service_name)
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def start_networking(self):
        """ Start ZeroTier daemon.
        """
        if not self.status_networking():
            try:
                subprocess.Popen(['sudo', 'service', 'zerotier-one', 'start'], stderr=subprocess.DEVNULL)
            except subprocess.CalledProcessError:
                return 1
            else:
                now = time.time()

                while time.time() <= now + 60:
                    if self.status_networking():
                        return 0

                return 1
        else:
            return 0
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def status_networking(self):
        """ Get status of ZeroTier One service.
        """
        try:
            if "ps_zerotier.sh" not in self.docker_ssh("ls", stderr=subprocess.DEVNULL).decode().split("\n"):
                subprocess.check_output(["docker-machine",
                                         "scp",
                                         os.path.join(
                                             os.path.dirname(os.path.abspath(__file__)),
                                             "util",
                                             "scripts",
                                             "ps_zerotier.sh"),
                                         "21:~/ps_zerotier.sh"])
            self.docker_ssh("chmod a+x ~/ps_zerotier.sh", stderr=subprocess.DEVNULL)
            processes = self.docker_ssh("./ps_zerotier.sh", stderr=subprocess.DEVNULL).decode().split("\n")
            for process in processes:
                if process.find("zerotier-one -d") != -1:
                    return True
            return False
        except subprocess.CalledProcessError:
            return False
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def connect_market(self, client, market):
        try:
            zt_device_address = json.loads(self.docker_ssh("sudo ./zerotier-cli info -j",
                                                           stderr=subprocess.DEVNULL).decode())["address"]
            response = client.join(market, zt_device_address)
            if response.ok:
                network_id = response.json().get("networkid")
                self.docker_ssh("sudo ./zerotier-cli join %s" % network_id, stderr=subprocess.DEVNULL)
                if self.wait_for_zt_confirmation():
                    pass
        except exceptions.ServerRequestError as e:
            if e.status_code == 400:
                logger.info(uxstring.UxString.invalid_network)
            else:
                raise e
        except subprocess.CalledProcessError as e:
            logger.info(str(e))
        time.sleep(10)  # wait for interface to come up
        return
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def _get_market_address(self):
        """ Get status of 21mkt network connection.

        Returns:
            zt_ip (str): ZeroTier IP address.
        """
        try:
            zt_conf = self.docker_ssh("sudo ./zerotier-cli listnetworks -j", stderr=subprocess.DEVNULL)
            if type(zt_conf) == bytes:
                zt_conf = zt_conf.decode()
            zt_conf_json = json.loads(zt_conf)
            for net in zt_conf_json:
                if net["name"] == "21mkt":
                    if net["status"] == "OK":
                        ip_addrs = net["assignedAddresses"]
                        for addr in ip_addrs:
                            potential_match = Two1Machine.ZEROTIER_CLI_IPV6_PATTERN.search(addr)
                            if potential_match is not None:
                                return "[%s]" % potential_match.group(0)
                        return ""
                    else:
                        return ""
            return ""
        except Exception:
            return ""
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def status_machine(self):
        """ Get status of virtual machine.
        """
        try:
            status = subprocess.check_output(["docker-machine", "status",
                                             self.name],
                                             stderr=subprocess.DEVNULL).decode().rstrip()
            if status.lower() == "running":
                return VmState.RUNNING
            elif status.lower() == "stopped":
                return VmState.STOPPED
            else:
                return VmState.UNKNOWN
        except:
            return VmState.NOEXIST

    # private methods
项目:redisrwlock    作者:veshboo    | 项目源码 | 文件源码
def runRedisServer(port=6379):
    """runs redis-server"""
    # waits until it can accept client connection by reading its all
    # startup messages until it says 'ready to accept ...', then
    # redirect any following output to DEVNULL.
    port = str(port)
    server = subprocess.Popen(['redis-server', '--port', port],
                              stdout=subprocess.PIPE,
                              universal_newlines=True)
    message = _REDIS_READY_MESSAGE + port
    while message not in server.stdout.readline():
        pass
    dumper = subprocess.Popen(['cat'],
                              stdin=server.stdout,
                              stdout=subprocess.DEVNULL)
    return server, dumper
项目:RSVPBot    作者:recursecenter    | 项目源码 | 文件源码
def devserver(port):
    config.rc_root = 'http://localhost:{}'.format(port)
    config.rc_api_root = config.rc_root + '/api/v1'

    proc = subprocess.Popen(
        ['python', 'devserver/__init__.py'],
        stdin=subprocess.DEVNULL,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        env={**os.environ, 'PORT': str(port)}
    )

    # wait for the dev server to come up
    time.sleep(1)

    try:
        yield
    finally:
        proc.kill()
        proc.wait()
项目:bridge-test    作者:Half-Shot    | 项目源码 | 文件源码
def clone(self, repo, branch="master"):
        location_index = repo+'#'+branch
        if self.repos.get(location_index) is not None:
            logger.debug("Repo %s already exists, reusing old." % location_index)
            return
        location = os.path.join(tempfile.gettempdir(), os.path.basename(location_index))
        if os.path.exists(location):
            logger.debug("Repo %s with branch %s already exists on disk, reusing and pulling." % (repo, branch))
            self.pull(location)
        else:
            logger.debug("Cloning %s with branch %s" % (repo, branch))
            result = subprocess.run(
                ["git", "clone", "-b", branch, repo, location],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
            )
            result.check_returncode()
        self.repos[location_index] = location
        self.copies[location_index] = []
        return location
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def _instancecheck_impl(self, value, info: Info = NoInfo()):
        if not isinstance(value, List(Str())):
            return info.errormsg(self)
        if not is_perf_available():
            return info.wrap(True)
        assert isinstance(value, list)
        if "wall-clock" in value:
            value = value.copy()
            value.remove("wall-clock")
        cmd = "perf stat -x ';' -e {props} -- /bin/echo".format(props=",".join(value))
        proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=subprocess.DEVNULL,
                                stderr=subprocess.PIPE, universal_newlines=True)
        out, err = proc.communicate()
        if proc.poll() > 0:
            return info.errormsg(self, "Not a valid properties list: " + str(err).split("\n")[0].strip())
        return info.wrap(True)
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def _exec(self, cmd: str, fail_on_error: bool = False,
              error_message: str = "Failed executing {cmd!r}: out={out!r}, err={err!r}",
              timeout: int = 10) -> bool:
        """
        Execute the passed command.

        :param cmd:
        :param fail_on_error:
        :param error_message: error message format
        :param timeout: time in seconds after which the command is aborted
        :return: Was the command executed successfully?
        """
        out_mode = subprocess.PIPE if fail_on_error else subprocess.DEVNULL
        proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=out_mode, stderr=out_mode,
                                universal_newlines=True)
        out, err = proc.communicate(timeout=timeout)
        if proc.wait() > 0:
            if fail_on_error:
                self._fail(error_message.format(cmd=cmd, out=out, err=err))
            else:
                return False
        return True
项目:rp2017-codegolf    作者:ReflectionsProjections    | 项目源码 | 文件源码
def cpp_verify(data, user_file):
    answer = open(user_file).read()
    # write to temp file
    with open('answer.cpp', 'w') as temp:
        temp.write(answer)
    try:
        test = open('task/test.h').read()
        with open('test.h', 'w') as temp:
            temp.write(test)
    except IOError: # user has not requested task yet
        print("A task folder could not be found. Request a task\nusing python client.py request <task_id>")
        sys.exit(1) # terminate program
    proc = sp.Popen(['g++', 'answer.cpp', 'test.h', '-o', 'test'], stderr=sp.DEVNULL, stdout=sp.DEVNULL)
    proc.wait()
    if(proc.returncode):
        print("Answer failed to compile.")
        sys.exit(1)
    proc = sp.Popen(['./test'], stdin=sp.PIPE, stdout=sp.PIPE)
    test_result, _ = proc.communicate(input=data)
    # dispose of temporary files
    os.remove('answer.cpp')
    os.remove('test.h')
    os.remove('test')

    return test_result
项目:pyhttps    作者:talhasch    | 项目源码 | 文件源码
def create_ssl_cert():
    if PY3:
        from subprocess import DEVNULL
    else:
        DEVNULL = open(os.devnull, 'wb')

    try:
        ssl_exec_list = ['openssl', 'req', '-new', '-x509', '-keyout', ssl_cert_path,
                         '-out', ssl_cert_path, '-days', '365', '-nodes',
                         '-subj', '/CN=www.talhasch.com/O=Talhasch Inc./C=TR']
        call(ssl_exec_list, stdout=DEVNULL, stderr=DEVNULL)
    except OpenSslExecutableNotFoundError:
        logging.error('openssl executable not found!')
        exit(1)

    logging.info('Self signed ssl certificate created at {}'.format(ssl_cert_path))
项目:ml-utils    作者:LinxiFan    | 项目源码 | 文件源码
def _DEPREC_bash_run(cmd, suppress_err=0):
    if not U.f_exists('~/.bashrc'):
        print('WARNING: ~/.bashrc not found. '
              'bash_output() will not be able to read the aliases.', file=sys.stderr)
    # hack: if you don't echo something first, your alias such as `ll` will
    # kill the script after it's run.
    # https://stackoverflow.com/questions/45558993/subprocess-interactive-bash-behavior
    cmd = 'printf ""; ' + cmd
    err = pc.DEVNULL if suppress_err else pc.STDOUT
    proc = pc.check_output(['/bin/bash', '-i', '-c', cmd],
                           stderr=err,
                           ).decode()
    # try:
    #     err = pc.DEVNULL if suppress_err else None
    #     return pc.check_output(['/bin/bash', '-i', '-c', cmd],
    #                            stderr=err,
    #                            ).decode()
    # except pc.CalledProcessError as exc:
    #     if suppress_err:
    #         print('Failed bash call:\n', exc.output.decode(), file=sys.stderr)
    #     raise
项目:WiFree    作者:Ablablab    | 项目源码 | 文件源码
def __start(self, args):
        # build aireplay flags
        flags = args

        if self.DEAUTH_BSSID_FLAG not in flags:
            flags.extend([self.DEAUTH_BSSID_FLAG, self._bssid])
        if '--ignore-negative-one' not in flags:
            flags.extend(["--ignore-negative-one"])

        # build Popen command as list
        cmd = [self.AIREPLAY] + flags + [self._miff]

        print("\t" + " ".join(cmd))

        self._proc = Popen(cmd,env={'PATH': os.environ['PATH']}, \
                               stderr=DEVNULL, stdin=DEVNULL, stdout=DEVNULL)

        return True
项目:BiliLive    作者:hr3lxphr6j    | 项目源码 | 文件源码
def cut_video(input_file: str, output_file: str, start_time: str, end_time: str, is_concat: bool = True):
    """
    ????
    :param input_file: ????
    :param output_file: ????
    :param start_time: ????
    :param end_time: ????
    :param is_concat: ?????Virtual Concatenation Script
    :return:
    """
    ffmpeg_command = ['ffmpeg', '-y']
    if is_concat:
        ffmpeg_command.extend(['-protocol_whitelist', 'file,pipe',
                               '-safe', '0',
                               '-f', 'concat',
                               '-i', '-'])
    else:
        ffmpeg_command.extend(['-i', input_file])
    ffmpeg_command.extend(['-ss', start_time,
                           '-to', end_time,
                           '-c', 'copy',
                           output_file])
    child = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL)
    child.communicate(input_file if is_concat else None)
项目:autograder    作者:robertu94    | 项目源码 | 文件源码
def build_docker(settings, student):
    """
    Creates a new docker container for the student
    """
    LOGGER.info('Beginning a Docker build for student %s', student['username'])



    cmd = 'docker build -t {student}_{project} --file={dockerfile} .'
    cmd = cmd.format(student=student['username'], project=settings['project']['name'],
                     dockerfile=settings['build']['dockerfile'])
    timeout = settings['build']['timeout']

    LOGGER.info(cmd)

    subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True,
                          timeout=timeout, cwd=student['directory'])
    LOGGER.info('Completed a Docker build for student %s', student['username'])
项目:autograder    作者:robertu94    | 项目源码 | 文件源码
def clean_hg(settings, student):
    """
    cleans a Mercurial repository with the latest submission
    """
    LOGGER.info("Using hg to clean up the directory for student %s", student['username'])

    #Remove all untracked files and directories
    #Override all options to enable the purge extension to clean the directory
    cmd = "hg --config='extensions.purge=' purge --all --dirs --files"
    timeout = int(settings['clean']['timeout']) or 5

    subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                          shell=True, timeout=timeout, cwd=student['directory'])

    #Undo any changes to tracked files made since the last commit
    cmd = "hg update -C"
    timeout = int(settings['clean']['timeout']) or 5

    subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                          shell=True, timeout=timeout, cwd=student['directory'])
项目:python-cireqs    作者:trustpilot    | 项目源码 | 文件源码
def docker_kill_and_remove(ctr_name):
    try:
        try:
            is_running = check_output(['docker', 'top', ctr_name], stderr=DEVNULL)
        except:
            pass
        else:
            check_output(['docker', 'kill', ctr_name], stderr=STDOUT)
        try:
            has_image = check_output(['docker', 'images', '-q', ctr_name], stderr=DEVNULL)
            assert has_image
        except:
            pass
        else:
            check_output(['docker', 'rm', ctr_name], stderr=STDOUT)
    except:
        logger.error('could not stop docker container:{}'.format(ctr_name))
项目:Dahua-Firmware-Mod-Kit    作者:BotoX    | 项目源码 | 文件源码
def Handle_SquashFS(self, Key):
        Path = os.path.join(self.DestDir, Key)
        DestDir = Path.rstrip(".raw") + ".extracted"

        Binary = "unsquashfs"
        if self.CheckDependency(Binary):
            return 1

        # Need root to preserve permissions.
        if self.Debug:
            self.Logger.debug(' '.join(["sudo", Binary, "-d", DestDir, Path]))
            Result = subprocess.call(["sudo", Binary, "-d", DestDir, Path])
        else:
            Result = subprocess.call(["sudo", Binary, "-d", DestDir, Path], stdout=subprocess.DEVNULL)

        return Result
项目:Dahua-Firmware-Mod-Kit    作者:BotoX    | 项目源码 | 文件源码
def Handle_CramFS(self, Key):
        Path = os.path.join(self.DestDir, Key)
        DestDir = Path.rstrip(".raw") + ".extracted"

        Binary = "cramfsck"
        if self.CheckDependency(Binary):
            return 1

        # Need root to preserve permissions.
        if self.Debug:
            self.Logger.debug(' '.join(["sudo", Binary, "-x", DestDir, Path]))
            Result = subprocess.call(["sudo", Binary, "-x", DestDir, Path])
        else:
            Result = subprocess.call(["sudo", Binary, "-x", DestDir, Path], stdout=subprocess.DEVNULL)

        return Result
项目:Dahua-Firmware-Mod-Kit    作者:BotoX    | 项目源码 | 文件源码
def Handle_CramFS(self, Key):
        ExtractedDir = os.path.join(self.Source, Key + ".extracted")
        OrigPath = os.path.join(self.Source, Key + ".raw")
        DestPath = os.path.join(self.BuildDir, Key + ".raw")

        Binary = "mkcramfs"
        if self.CheckDependency(Binary):
            return 1

        # Need root to access all files.
        if self.Debug:
            self.Logger.debug(' '.join(["sudo", Binary, ExtractedDir, DestPath]))
            Result = subprocess.call(["sudo", Binary, ExtractedDir, DestPath])
        else:
            Result = subprocess.call(["sudo", Binary, ExtractedDir, DestPath], stdout=subprocess.DEVNULL)

        return Result
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def _get_gitinfo():
    import pygrunt.platform as platform
    import subprocess
    from pathlib import Path

    git = platform.current.find_executable('git')
    if git is None:
        # No git installed; assume we're on master
        return ('master', '')

    cwd = str(Path(__file__).parent)

    args = [git, 'rev-parse', '--abbrev-ref', 'HEAD']
    result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
    if result.returncode != 0:
        # Quietly return defaults on fail
        return ('master', '')

    branch = result.stdout

    args = [git, 'rev-parse', 'HEAD']
    result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
    if result.returncode != 0:
        # Quietly return defaults on fail
        return ('master', '')

    commit = result.stdout

    return (branch.strip(), commit.strip())
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def removeDirtyDir(self):
        if self.cmake_build_info["build_dir"].is_dir():
            print("Cleaning up Build Directory")
            subprocess.call(
                ["rm", "-rf",
                 str(self.cmake_build_info["build_dir"])],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL)
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def removeOldCMakeFiles(self):
        if self.cmake_build_info["old_cmake_dir"].is_dir():
            print("Cleaning up Old CMake Directory")
            subprocess.call(
                ["rm", "-rf",
                 str(self.cmake_build_info["old_cmake_dir"])],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL)
        for path in self.cmake_build_info["old_cmake_files"]:
            if path.is_file():
                print("Cleaning up Old CMake Files")
                subprocess.call(
                    ["rm", str(path)],
                    stdout=subprocess.DEVNULL,
                    stderr=subprocess.DEVNULL)
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def setup_rtags_daemon(self):
        print("Initializing RTags Daemon")
        try:
            subprocess.check_output(
                self.cmake_cmd_info["rtags_shutdwn"],
                stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            print(e.output)

        try:
            subprocess.check_call(
                self.cmake_cmd_info["rdm_cmd"], stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            print(e.output)
            raise
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def connect_rtags_client(self):
        print("Connecting RTags Client")
        if self.cmake_build_info["comp_data_cmake"].is_file():
            rtags_clt_cmd_out = subprocess.call(
                self.cmake_cmd_info["rc_cmd"],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL)
            if rtags_clt_cmd_out != 0:
                print("Info: RTags Daemon Not Running")
                return
        else:
            print("Error Generating Compilation Database With CMake")
            return
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def rtags_set_file(self, arg):
        current_buffer = arg
        try:
            subprocess.call(
                self.cmake_cmd_info["rtags_file"] + current_buffer,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            print(e.output)
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def update_rtags_buffers(self, args):
        buffers = args
        cpp_buffers = []
        for buffer in buffers:
            if str(buffer)[-4:] in ['.cpp', '.cc', '.c', '.h', '.hpp']:
                cpp_buffers.append(buffer)
        try:
            subprocess.call(
                self.cmake_cmd_info["rtags_buffer"] + cpp_buffers,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            print(e.output)

    #@classmethod
项目:cmake.nvim    作者:phillipbonhomme    | 项目源码 | 文件源码
def rtags_tagrun(self, cmd):
        try:
            tagrun = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            print(e.output)
        taglines = tagrun.split("\n")
        tags = map(split("\t"), taglines)
        tags[:] = map(reverse(), tags)
        tags[:] = map(self.format_rtags, tags)
        return tags
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, traceback):
        try:
            if exc_type is None:
                env = { k:v for (k,v) in os.environ.items() if k in self.whiteList }
                env["BOB_LOCAL_ARTIFACT"] = self.name
                env["BOB_REMOTE_ARTIFACT"] = self.remoteName
                ret = subprocess.call(["/bin/bash", "-ec", self.uploadCmd],
                    stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
                    cwd="/tmp", env=env)
                if ret != 0:
                    raise ArtifactUploadError("command return with status {}".format(ret))
        finally:
            os.unlink(self.name)
        return False
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def predictLiveBuildId(self):
        if self.__commit:
            return [ bytes.fromhex(self.__commit) ]

        if self.__tag:
            # Annotated tags are objects themselves. We need the commit object!
            refs = ["refs/tags/" + self.__tag + '^{}', "refs/tags/" + self.__tag]
        else:
            refs = ["refs/heads/" + self.__branch]
        cmdLine = ['git', 'ls-remote', self.__url] + refs
        try:
            output = subprocess.check_output(cmdLine, universal_newlines=True,
                stderr=subprocess.DEVNULL).strip()
        except subprocess.CalledProcessError as e:
            return [None]

        # have we found anything at all?
        if not output:
            return [None]

        # See if we got one of our intended refs. Git is generating lines with
        # the following format:
        #
        #   <sha1>\t<refname>
        #
        # Put the output into a dict with the refname as key. Be extra careful
        # and strip out lines not matching this pattern.
        output = {
            commitAndRef[1].strip() : bytes.fromhex(commitAndRef[0].strip())
            for commitAndRef
            in (line.split('\t') for line in output.split('\n'))
            if len(commitAndRef) == 2 }
        for ref in refs:
            if ref in output: return [output[ref]]

        # uhh, should not happen...
        return [None]
项目:wr-lx-setup    作者:Wind-River    | 项目源码 | 文件源码
def load_mirror_index(self, remote_mirror):
        # See if there is a mirror index available from the BASE_URL
        mirror_index = os.path.join(self.conf_dir, 'mirror-index')
        try:
            cmd = [self.tools['git'], 'ls-remote', remote_mirror, self.base_branch]
            utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=self.project_dir, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
        except:
            try:
                remote_mirror += "/.git"
                cmd = [self.tools['git'], 'ls-remote', remote_mirror, self.base_branch]
                utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=self.project_dir, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
            except:
                # No mirror, return
                return None

        logger.plain('Loading the mirror index from %s (%s)...' % (remote_mirror, self.base_branch))
        # This MIGHT be a valid mirror..
        if not os.path.exists(mirror_index):
            os.makedirs(mirror_index)
            cmd = [self.tools['git'], 'init' ]
            utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index)

        try:
            cmd = [self.tools['git'], 'fetch', '-f', '-n', '-u', remote_mirror, self.base_branch + ':' + self.base_branch]
            utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index)
        except:
            # Could not fetch, return
            return None

        logger.debug('Found mirrored index.')
        cmd = [self.tools['git'], 'checkout', self.base_branch ]
        utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index)
        cmd = [self.tools['git'], 'reset', '--hard' ]
        utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index)

        return mirror_index
项目:petronia    作者:groboclown    | 项目源码 | 文件源码
def exec_cmd(bus, *cmd_line):
    """
    Executes an external process using the given arguments.  Note that the
    backslash (`\`) character must be escaped, because they are within a Python
    string, so a total of 2 backslashes to equal 1 real backslash.

    For example, to run a command prompt with fancy colors and start it in the
    root of the file system when you press <kbd>F1</kbd>, use:
'f1 => cmd cmd.exe /c start cmd.exe /E:ON /V:ON /T:17 /K cd \\'
```

:param bus:
:param cmd_line: The command-line string equivalent to run.
:return:
"""
# The cmd_line is a space-split set of arguments.  Because we're running a
# command line, and should allow all kinds of inputs, we'll join it back
# together.
full_cmd = " ".join(cmd_line)

# We won't use shlex, because, for Windows, splitting is just unnecessary.
print('CMD Running `' + full_cmd + '`')
# TODO look at making this a bus command, or at least a bus announcement.
proc = subprocess.Popen(full_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
bus.fire(event_ids.LOG__INFO, target_ids.LOGGER, {
    'message': '{0}: {1}'.format(proc.pid, full_cmd)
})

```

项目:Projects    作者:SilverLuke    | 项目源码 | 文件源码
def get_last_update(self, path):
        os.chdir(path)
        cmd = "stat -c %y .git/FETCH_HEAD".split()
        return str(subprocess.check_output(cmd, stderr=subprocess.DEVNULL))[2:21]
项目:crema    作者:bmcfee    | 项目源码 | 文件源码
def git_version():
    '''Return the git revision as a string

    Returns
    -------
    git_version : str
        The current git revision
    '''
    def _minimal_ext_cmd(cmd):
        # construct minimal environment
        env = {}
        for k in ['SYSTEMROOT', 'PATH']:
            v = os.environ.get(k)
            if v is not None:
                env[k] = v
        # LANGUAGE is used on win32
        env['LANGUAGE'] = 'C'
        env['LANG'] = 'C'
        env['LC_ALL'] = 'C'

        output = subprocess.check_output(cmd,
                                         stderr=subprocess.DEVNULL,
                                         env=env)
        return output

    try:
        out = _minimal_ext_cmd(['git', 'rev-parse', '--verify', '--quiet', '--short', 'HEAD'])
        GIT_REVISION = out.strip().decode('ascii')
    except subprocess.CalledProcessError:
        GIT_REVISION = 'UNKNOWN'

    return GIT_REVISION