我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.which()。
def assert_installed(win=None, modules=[], binaries=[]): missing = defaultdict(list) for items, what, func in ((modules, 'modules', find_spec), (binaries, 'binaries', which)): for item in items: if not func(item): missing[what].append(item) if missing: msg = [] for subject, names in missing.items(): if len(names) == 1: subject = {'modules': 'module', 'binaries': 'binary'}[subject] msg.append('%s "%s"' % (subject, '", "'.join(names))) msg = 'You are missing the %s for this.' % ' and '.join(msg) if win: from PyQt5.QtWidgets import QMessageBox QMessageBox.warning(win, ' ', msg) else: raise ImportError(msg) return not missing
def get_media_info(self, media_file): try: ffprobe = shutil.which('ffprobe') except Exception: # python2 from distutils.spawn import find_executable ffprobe = find_executable('ffprobe') if ffprobe: cmd = "ffprobe -v error -select_streams v -show_entries stream=width,height,avg_frame_rate,duration -of default=noprint_wrappers=1 -print_format json %s" % media_file result = subprocess.check_output(cmd.split(' '), universal_newlines=True) vjres = json.loads(result)['streams'][0] if not vjres.get('duration'): cmd = "ffprobe -v error -select_streams v -show_format_entry duration -of default=noprint_wrappers=1 -print_format json %s" % media_file result = subprocess.check_output(cmd.split(' '), universal_newlines=True) vjres['duration'] = json.loads(result)['format']['duration'] cmd = "ffprobe -v error -select_streams a -show_entries stream=sample_rate -of default=noprint_wrappers=1 -print_format json %s" % media_file result = subprocess.check_output(cmd.split(' '), universal_newlines=True) ajres = json.loads(result)['streams'][0] vjres['sample_rate'] = ajres['sample_rate'] return vjres else: logger.error('ffprobe is required') sys.exit()
def run_cmd(pl, cmd, stdin=None, strip=True): '''Run command and return its stdout, stripped If running command fails returns None and logs failure to ``pl`` argument. :param PowerlineLogger pl: Logger used to log failures. :param list cmd: Command which will be run. :param str stdin: String passed to command. May be None. :param bool strip: True if the result should be stripped. ''' try: p = Popen(cmd, shell=False, stdout=PIPE, stdin=PIPE) except OSError as e: pl.exception('Could not execute command ({0}): {1}', e, cmd) return None else: stdout, err = p.communicate( stdin if stdin is None else stdin.encode(get_preferred_output_encoding())) stdout = stdout.decode(get_preferred_input_encoding()) return stdout.strip() if strip else stdout
def createProject( self, project ): tm = self.table_view.table_model self.all_visible_table_columns = (tm.col_status, tm.col_name, tm.col_date) if shutil.which( hglib.HGPATH ) is None: self.log.error( T_('Mercurial "hg" command line tool not found') ) return None try: return wb_hg_project.HgProject( self.app, project, self ) except hglib.error.ServerError as e: self.app.log.error( T_('Failed to add Hg repo %s') % (project.path,) ) self.app.log.error( T_('hg error: %s') % (e,) ) return None #------------------------------------------------------------
def find_editor(): for var in 'GIT_EDITOR', 'EDITOR': editor = os.environ.get(var) if editor is not None: return editor if sys.platform == 'win32': fallbacks = ['notepad.exe'] else: fallbacks = ['/etc/alternatives/editor', 'nano'] for fallback in fallbacks: if os.path.isabs(fallback): found_path = fallback else: found_path = shutil.which(fallback) if found_path and os.path.exists(found_path): return found_path error('Could not find an editor! Set the EDITOR environment variable.')
def check_executable(executable): """ Checks if an executable exists in $PATH. Arguments: executable -- the name of the executable (e.g. "echo") Returns: True or False """ logger = logging.getLogger(__name__) logger.debug("Checking executable '%s'...", executable) executable_path = find_executable(executable) found = executable_path is not None if found: logger.debug("Executable '%s' found: '%s'", executable, executable_path) else: logger.debug("Executable '%s' not found", executable) return found
def _get_chromedriver_path() -> str: """Get path to chromedriver executable. Usually it is on the project root. """ chromedriver_path = shutil.which('chromedriver') if chromedriver_path: return chromedriver_path if 'TRAVIS' in os.environ: chromedriver_path = os.path.join( os.environ['TRAVIS_BUILD_DIR'], 'chromedriver') else: chromedriver_path = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'chromedriver' ) return chromedriver_path # see https://www.spirulasystems.com/blog/2016/08/11/https-everywhere-unit-testing-for-chromium/ # noqa: E501
def updateCheck(): from pathlib import Path if not shutil.which("git"): return # check if new commits are available projectDir = str(Path(__file__).parent) subprocess.call(["git", "fetch"], cwd=projectDir) output = subprocess.check_output(["git", "status", "-uno"], cwd=projectDir) behindIndex = output.find(b"Your branch is behind ") if behindIndex > 0: msgEnd = output.find(b"\n (use \"git pull\" to update your local branch)") if msgEnd > 0: output = output[behindIndex:msgEnd] statusUpdate("Current CheriBuild checkout can be updated: ", output.decode("utf-8")) if input("Would you like to update before continuing? y/[n] (Enter to skip) ").lower().startswith("y"): subprocess.check_call(["git", "pull", "--rebase"], cwd=projectDir) os.execv(sys.argv[0], sys.argv)
def extract_sdk_archives(cheriConfig, archives: "typing.List[SdkArchive]"): if cheriConfig.sdkBinDir.is_dir(): statusUpdate(cheriConfig.sdkBinDir, "already exists, not extracting SDK archives") return cheriConfig.FS.makedirs(cheriConfig.sdkDir) for archive in archives: archive.extract() if not cheriConfig.sdkBinDir.exists(): fatalError("SDK bin dir does not exist after extracting sysroot archives!") # Use the host ar/ranlib if they are missing for tool in ("ar", "ranlib"): if not (cheriConfig.sdkDir / "bin" / tool).exists(): cheriConfig.FS.createSymlink(Path(shutil.which(tool)), cheriConfig.sdkBinDir / tool, relative=False) cheriConfig.FS.createBuildtoolTargetSymlinks(cheriConfig.sdkBinDir / tool)
def __init__(self, config: CheriConfig): super().__init__(config) # set up the install/build/source directories (allowing overrides from config file) self.configureCommand = "" # non-assignable variables: self.make_args = MakeOptions() self.configureArgs = [] # type: typing.List[str] self.configureEnvironment = {} # type: typing.Dict[str,str] if self.config.createCompilationDB and self.compileDBRequiresBear: self._addRequiredSystemTool("bear", installInstructions="Run `cheribuild.py bear`") self._lastStdoutLineCanBeOverwritten = False self._preventAssign = True if self.requiresGNUMake: if IS_LINUX and not shutil.which("gmake"): statusUpdate("Could not find `gmake` command, assuming `make` is GNU make") self.makeCommand = "make" else: self._addRequiredSystemTool("gmake", homebrewPackage="make") self.makeCommand = "gmake" else: self.makeCommand = "make" # Make sure that API is used properly
def getInterpreter(cmdline: "typing.Sequence[str]") -> "typing.Optional[typing.List[str]]": """ :param cmdline: The command to check :return: The interpreter command if the executable does not have execute permissions """ executable = Path(cmdline[0]) print(executable, os.access(str(executable), os.X_OK), cmdline) if not executable.exists(): executable = Path(shutil.which(str(executable))) statusUpdate(executable, "is not executable, looking for shebang:", end=" ") with executable.open("r", encoding="utf-8") as f: firstLine = f.readline() if firstLine.startswith("#!"): interpreter = shlex.split(firstLine[2:]) statusUpdate("Will run", executable, "using", interpreter) return interpreter else: statusUpdate("No shebang found.") return None
def require_command( runner: Runner, command: str, message: Optional[str] = None ): if message is None: message = "Please install " + command try: runner.get_output(["which", command]) except CalledProcessError as e: sys.stderr.write(message + "\n") sys.stderr.write( '(Ran "which {}" to check in your $PATH.)\n'.format(command) ) sys.stderr.write( "See the documentation at https://telepresence.io " "for more details.\n" ) raise SystemExit(1)
def kubectl_or_oc(server: str) -> str: """ Return "kubectl" or "oc", the command-line tool we should use. :param server: The URL of the cluster API server. """ if which("oc") is None: return "kubectl" # We've got oc, and possibly kubectl as well. We only want oc for OpenShift # servers, so check for an OpenShift API endpoint: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE try: with urlopen(server + "/version/openshift", context=ctx) as u: u.read() except HTTPError: return "kubectl" else: return "oc"
def get_env_variables(runner: Runner, remote_info: RemoteInfo, context: str) -> Dict[str, str]: """ Generate environment variables that match kubernetes. """ # Get the environment: remote_env = _get_remote_env( runner, context, remote_info.namespace, remote_info.pod_name, remote_info.container_name ) # Tell local process about the remote setup, useful for testing and # debugging: result = { "TELEPRESENCE_POD": remote_info.pod_name, "TELEPRESENCE_CONTAINER": remote_info.container_name } # Alpine, which we use for telepresence-k8s image, automatically sets these # HOME, PATH, HOSTNAME. The rest are from Kubernetes: for key in ("HOME", "PATH", "HOSTNAME"): if key in remote_env: del remote_env[key] result.update(remote_env) return result
def setUpClass(self): # Getting pass-import module super(TestPass, self).setUpClass() # Pass binary os.environ['PASSWORD_STORE_BIN'] = shutil.which("pass") # GPG Config if 'GPG_AGENT_INFO' in os.environ: os.environ.pop('GPG_AGENT_INFO', None) os.environ['GNUPGHOME'] = os.path.join(os.getcwd(), 'gnupg') # Tests directories self.tmp = os.path.join(self.tmp, self.__name__[8:].lower()) shutil.rmtree(self.tmp, ignore_errors=True) os.makedirs(self.tmp, exist_ok=True)
def generate_iso(self, cleanup=True): self.cdrom_iso_tmp = NamedTemporaryFile(delete=False, dir=self.tmp_dir.name) cdrom_iso = self.cdrom_iso_tmp.name # chmod to be r/w by everyone # so we can remove the file even when qemu takes the ownership tools = { "genisoimage": self.__genisoimage, "mkisofs": self.__mkisofs } available = next(bin for bin in tools.keys() if shutil.which(bin) is not None) # generate iso if available is None: raise Exception('Cannot find tools for creating ISO images') tools[available](cdrom_iso) logging.debug('ISO generated at %s', cdrom_iso) # cleanup if cleanup: self.cdrom_dir_tmp.cleanup() return cdrom_iso
def assemble(outfile, compiler, toolchain, flags=()): """Assembles the solver. Returns the filename that was generated.""" print('Assembling ' + toolchain) base, _ = os.path.splitext(outfile) asmfile = base + '-' + toolchain.lower() + '.s' prefix = os.path.dirname(os.path.dirname(shutil.which(compiler))) include = os.path.join(prefix, 'include') cmd = [compiler, '-I' + include, '-fPIC', '-O0'] cmd.extend(flags) cmd.extend(['-S', '-o', asmfile, '-c', outfile]) print('Running command:\n $ ' + ' '.join(cmd)) t0 = time.time() subprocess.check_call(cmd) t1 = time.time() print('{0} assembled in {1:.3} seconds'.format(toolchain, t1 - t0)) return asmfile
def run_pandoc(text='', args=None): """ Low level function that calls Pandoc with (optionally) some input text and/or arguments """ if args is None: args = [] pandoc_path = which('pandoc') if pandoc_path is None or not os.path.exists(pandoc_path): raise OSError("Path to pandoc executable does not exists") proc = Popen([pandoc_path] + args, stdin=PIPE, stdout=PIPE, stderr=PIPE) out, err = proc.communicate(input=text.encode('utf-8')) exitcode = proc.returncode if exitcode != 0: raise IOError(err) return out.decode('utf-8')
def __setitem__(self, key, value, dict_setitem=dict.__setitem__): 'od.__setitem__(i, y) <==> od[i]=y' # Setting a new item creates a new link which goes at the end of the linked # list, and the inherited dictionary is updated with the new key/value pair. if key not in self: root = self.__root last = root[0] last[1] = root[0] = self.__map[key] = [last, root, key] dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__): 'od.__delitem__(y) <==> del od[y]' # Deleting an existing item uses self.__map to find the link which is # then removed by updating the links in the predecessor and successor nodes. dict_delitem(self, key) link_prev, link_next, key = self.__map.pop(key) link_prev[1] = link_next link_next[0] = link_prev
def fromkeys(cls, iterable, value=None): '''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S and values equal to v (which defaults to None). ''' d = cls() for key in iterable: d[key] = value return d
def valid_ident(s): m = IDENTIFIER.match(s) if not m: raise ValueError('Not a valid Python identifier: %r' % s) return True # The ConvertingXXX classes are wrappers around standard Python containers, # and they serve to convert any suitable values in the container. The # conversion converts base dicts, lists and tuples to their wrapped # equivalents, whereas strings which match a conversion format are converted # appropriately. # # Each wrapper should have a configurator attribute holding the actual # configurator to use for conversion.
def check_executable(executable: str): location = shutil.which(executable) if location is None: print('`{0}` is not found on your PATH.\n' 'Make sure that `{0}` is installed on your system ' 'and available on the PATH.'.format(executable)) sys.exit(1) else: pass
def extract_zip(data: bytes, path: Path) -> None: """Extract zipped data to path.""" # On mac zipfile module cannot extract correctly, so use unzip instead. if curret_platform() == 'mac': import subprocess import shutil zip_path = path / 'chrome.zip' if not path.exists(): path.mkdir(parents=True) with zip_path.open('wb') as f: f.write(data) if not shutil.which('unzip'): raise OSError('Failed to automatically extract chrome.zip.' f'Please unzip {zip_path} manually.') subprocess.run(['unzip', str(zip_path)], cwd=str(path)) if chromium_excutable().exists() and zip_path.exists(): zip_path.unlink() else: with ZipFile(BytesIO(data)) as zf: zf.extractall(str(path)) exec_path = chromium_excutable() if not exec_path.exists(): raise IOError('Failed to extract chromium.') exec_path.chmod(exec_path.stat().st_mode | stat.S_IXOTH | stat.S_IXGRP | stat.S_IXUSR) logger.warning(f'chromium extracted to: {path}')
def und_tool_exists() -> bool: return shutil.which('und') is not None
def as_tuple(self, value): """Utility function which converts lists to tuples.""" if isinstance(value, list): value = tuple(value) return value