我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.access()。
def _clean_check(cmd, target): """ Run the command to download target. If the command fails, clean up before re-raising the error. """ try: subprocess.check_call(cmd) except subprocess.CalledProcessError: if os.access(target, os.F_OK): os.unlink(target) raise
def copy_from_host(module): compress = module.params.get('compress') src = module.params.get('src') if not os.path.exists(src): module.fail_json(msg="file not found: {}".format(src)) if not os.access(src, os.R_OK): module.fail_json(msg="file is not readable: {}".format(src)) mode = oct(os.stat(src).st_mode & 0o777) with open(src, 'rb') as f: raw_data = f.read() sha1 = hashlib.sha1(raw_data).hexdigest() data = zlib.compress(raw_data) if compress else raw_data module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode, source=src)
def which(program): """Determines whether program exists """ def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file raise
def populate_memory(self, areas): for name, address, size, permission, input_file in areas: perm = self.unicorn_permissions(permission) self.vm.mem_map(address, size, perm) self.areas[name] = [address, size, permission,] msg = "Map %s @%x (size=%d,perm=%s)" % (name, address, size, permission) if input_file is not None and os.access(input_file, os.R_OK): code = open(input_file, 'rb').read() self.vm.mem_write(address, bytes(code[:size])) msg += " and content from '%s'" % input_file self.log(msg, "Setup") self.start_addr = self.areas[".text"][0] self.end_addr = -1 return True
def check_path_owner(path): # If we don't have a way to check the effective uid of this process, then # we'll just assume that we own the directory. if not hasattr(os, "geteuid"): return True previous = None while path != previous: if os.path.lexists(path): # Check if path is writable by current user. if os.geteuid() == 0: # Special handling for root user in order to handle properly # cases where users use sudo without -H flag. try: path_uid = get_path_uid(path) except OSError: return False return path_uid == 0 else: return os.access(path, os.W_OK) else: previous, path = path, os.path.dirname(path)
def __addTooltip(self, item, text, hideWarn=False): self.__loadTooltip() if not ToolTip: if not hideWarn: self.warn("ToolTips unavailable - check tooltip.py is in the lib folder") else: # turn off warnings about tooltips if hideWarn: myWarn = self.__pauseWarn() var = StringVar(self.topLevel) var.set(text) tip = ToolTip(item, delay=500, follow_mouse=1, textvariable=var) item.tooltip = tip if hideWarn: self.__resumeWarn(myWarn) return var ##################################### # FUNCTIONS to show pop-up dialogs ##################################### # function to access the last made pop_up
def get_config_file_path(cls): if not cls.IS_GLOBAL: # local to this directory base_path = os.path.join('.') else: base_path = os.path.expanduser('~') if not os.access(base_path, os.W_OK): base_path = '/tmp' base_path = os.path.join(base_path, '.polyaxon') if not os.path.exists(base_path): try: os.makedirs(base_path) except OSError: # Except permission denied and potential race conditions # in multi-threaded environments. logger.error('Could not create config directory `{}`'.format(base_path)) return os.path.join(base_path, cls.CONFIG_FILE_NAME)
def _test_NoAccessDir(self, nodeName): devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName) device = devMgr._get_registeredDevices()[0] fileMgr = self._domMgr._get_fileMgr() dirname = '/noaccess' testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname) if not os.path.exists(testdir): os.mkdir(testdir, 0000) else: os.chmod(testdir, 0000) try: self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory') self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY) finally: os.rmdir(testdir)
def test_ExistsException(self): self.assertNotEqual(self._domMgr, None) fileMgr = self._domMgr._get_fileMgr() # Makes sure that FileSystem::exists() throws correct exception and # doesn't kill domain for files in directories it cannot access dirname = '/noaccess' testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname) if not os.path.exists(testdir): os.mkdir(testdir, 0644) else: os.chmod(testdir, 0644) try: self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory') self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile')) finally: os.rmdir(testdir)
def check_fastq(fastq): # Check if fastq is readable if not os.access(fastq, os.R_OK): martian.exit("Do not have file read permission for FASTQ file: %s" % fastq) # Check if fastq is gzipped is_gzip_fastq = True try: with gzip.open(fastq) as f: f.read(1) except: is_gzip_fastq = False if is_gzip_fastq and not fastq.endswith(cr_constants.GZIP_SUFFIX): martian.exit("Input FASTQ file is gzipped but filename does not have %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX)) if not is_gzip_fastq and fastq.endswith(cr_constants.GZIP_SUFFIX): martian.exit("Input FASTQ file is not gzipped but filename has %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
def safeInstall(): FACTORIOPATH = getFactorioPath() try: if not os.path.isdir("%s" % (FACTORIOPATH) ): if os.access("%s/.." % (FACTORIOPATH), os.W_OK): os.mkdir(FACTORIOPATH, 0o777) else: subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH]) subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH]) os.mkdir(os.path.join(FACTORIOPATH, "saves")) os.mkdir(os.path.join(FACTORIOPATH, "config")) with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc: lines = bashrc.read() if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1: bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") print("You'll want to restart your shell for command autocompletion. Tab is your friend.") updateFactorio() except IOError as e: print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e)) sys.exit(1)
def __init__(self, dir, file_template=_default_file_template, truncate_slug_length=40, version_locations=None, sourceless=False, output_encoding="utf-8"): self.dir = dir self.file_template = file_template self.version_locations = version_locations self.truncate_slug_length = truncate_slug_length or 40 self.sourceless = sourceless self.output_encoding = output_encoding self.revision_map = revision.RevisionMap(self._load_revisions) if not os.access(dir, os.F_OK): raise util.CommandError("Path doesn't exist: %r. Please use " "the 'init' command to create a new " "scripts folder." % dir)
def __init__(self, certstore): """ certstore - path to the file used to store CA certificates eg /etc/apache/ssl.crt/ca-bundle.crt >>> v = Verifier('/etc/dummy.crt') >>> v.verify('pippo') Traceback (most recent call last): File "/usr/lib/python2.3/doctest.py", line 442, in _run_examples_inner compileflags, 1) in globs File "<string>", line 1, in ? File "verifier.py", line 46, in verify self._setup() File "verifier.py", line 36, in _setup raise VerifierError, "cannot access %s" % self._certstore VerifierError: cannot access /etc/dummy.crt >>> """ self._certstore = certstore self._smime = None
def main(argv): dir_ = argv[1] for entry in sorted(os.listdir(dir_)): path = os.path.join(dir_, entry) size = os.path.getsize(path) mtime = os.path.getmtime(path) mtime_format = time.strftime("%b %d %H:%M", time.localtime(mtime)) reset = '\033[0m' if os.path.isdir(path): color = '\033[1;94m' elif os.access(path, os.X_OK): color = '\033[1;92m' else: color = '' senf.print_("%6d %13s %s%s%s" % (size, mtime_format, color, entry, reset))
def check_exe(name, env=None): """ Ensure that a program exists :type name: string :param name: name or path to program :return: path of the program or None """ if not name: raise ValueError('Cannot execute an empty string!') def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(name) if fpath and is_exe(name): return os.path.abspath(name) else: env = env or os.environ for path in env["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, name) if is_exe(exe_file): return os.path.abspath(exe_file) return None
def _test_resource_paths(my): path = my.server.get_resource_path('admin') # not a very accurate test my.assertEquals(True, 'etc/admin.tacticrc' in path) paths = my.server.create_resource_paths() sys_login = getpass.getuser() dir = my.server.get_home_dir() is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir) if dir and is_dir_writeable: dir = "%s/.tactic/etc" % dir else: if os.name == 'nt': dir = 'C:/sthpw/etc' else: dir = '/tmp/sthpw/etc' compared = '%s/%s.tacticrc' %(dir, sys_login) in paths my.assertEquals(True, compared) # since we use admin to get resource path , my.login should also be admin my.assertEquals('admin', my.server.get_login())
def find_boot_files(name, shortname, basedir): # find vmlinuz or initrd if name: fullpath = name if name[0]=='/' else basedir + '/boot/' + name else: # try the (only) symlink at the root directory try1 = basedir + '/' + shortname + '*' found = sorted(glob.glob(try1)) if len(found) >= 1 and os.access(found[0], os.R_OK): fullpath = os.path.realpath(found[0]) else: # try the highest numbered version at /boot try2 = basedir + '/boot/' + shortname + '*' found = sorted(glob.glob(try2)) if len(found) < 1: sys.exit('cannot read ' + try1 + ' and cannot find ' + try2) fullpath = found[-1] if (len(found) > 1): warnings.warn('found more than one ' + try2 + ' , using ' + fullpath) if not os.access(fullpath, os.R_OK): sys.exit('failed to read ' + fullpath) return fullpath
def which(program): import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None #source: https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file
def check_job_path(jobs_dir, job): if 'path' not in job: raise Exception('job does not have the "path" attribute: %s' % job) if not isinstance(job['path'], basestring): raise Exception('"path" attribute is not a string: %s' % job) # check that 'path' is rooted in the same directory as the .vcsjob file if job['path'].startswith('/'): raise Exception('job path is not relative: %s' % job['path']) path = os.path.normpath(os.path.join(jobs_dir, job['path'])) if not path.startswith(jobs_dir): raise Exception('job path is not inside the job tree: %s' % path) # also check that the job exists and is runnable if not os.path.isfile(path): raise Exception('no such file: %s' % path) if not os.access(path, os.X_OK): raise Exception('file is not executable: %s' % path) if 'profiles' not in job: job['profiles'] = None
def libvlc_vlm_show_media(p_instance, psz_name): '''Return information about the named media as a JSON string representation. This function is mainly intended for debugging use, if you want programmatic access to the state of a vlm_media_instance_t, please use the corresponding libvlc_vlm_get_media_instance_xxx -functions. Currently there are no such functions available for vlm_media_t though. @param p_instance: the instance. @param psz_name: the name of the media, if the name is an empty string, all media is described. @return: string with information about named media, or None on error. ''' f = _Cfunctions.get('libvlc_vlm_show_media', None) or \ _Cfunction('libvlc_vlm_show_media', ((1,), (1,),), string_result, ctypes.c_void_p, Instance, ctypes.c_char_p) return f(p_instance, psz_name)
def getTasksProcForce(self) : """ Get tasks list by bruteforcing /proc """ for i in range(1, 65535, 1) : if(os.access("/proc/" + str(i) + "/status", os.F_OK) == True): l = self.openProcTaskStatus("/proc/" + str(i) + "/status") if(l != []): self.tasks_procforce.map_tasks[int(l[0])] = [int(l[1]), int(l[2]), l[3], 0, 0] self.tasks_procforce.list_tasks.append(int(l[0])) if(os.access("/proc/" + str(i) + "/task", os.F_OK) == True): for srep in os.listdir("/proc/" + str(i) + "/task"): if(srep != l[0]): ll = self.openProcTaskStatus("/proc/" + str(i) + "/task/" + srep + "/status") self.tasks_procforce.map_tasks[int(ll[0])] = [int(ll[1]), int(ll[2]), ll[3], 0, 1] self.tasks_procforce.list_tasks.append(int(ll[0]))
def find_ancestor_cmd_path(self, cmd, cwd): """Recursively check for command binary in ancestors' node_modules/.bin directories.""" node_modules_bin = path.normpath(path.join(cwd, 'node_modules/.bin/')) binary = path.join(node_modules_bin, cmd) if sublime.platform() == 'windows' and path.splitext(binary)[1] != '.cmd': binary += '.cmd' if binary and access(binary, X_OK): return binary parent = path.normpath(path.join(cwd, '../')) if parent == '/' or parent == cwd: return None return self.find_ancestor_cmd_path(cmd, parent)
def _clone_test_db(self, number, verbosity, keepdb=False): source_database_name = self.connection.settings_dict['NAME'] target_database_name = self.get_test_db_clone_settings(number)['NAME'] # Forking automatically makes a copy of an in-memory database. if not self.connection.is_in_memory_db(source_database_name): # Erase the old test database if os.access(target_database_name, os.F_OK): if keepdb: return if verbosity >= 1: print("Destroying old test database for alias %s..." % ( self._get_database_display_str(verbosity, target_database_name), )) try: os.remove(target_database_name) except Exception as e: sys.stderr.write("Got an error deleting the old test database: %s\n" % e) sys.exit(2) try: shutil.copy(source_database_name, target_database_name) except Exception as e: sys.stderr.write("Got an error cloning the test database: %s\n" % e) sys.exit(2)
def which1(program, pathstr=None): if pathstr is None: pathstr = os.environ["PATH"] def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in pathstr.split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
def execd_submodule_paths(command, execd_dir=None): """Generate a list of full paths to the specified command within exec_dir. """ for module_path in execd_module_paths(execd_dir): path = os.path.join(module_path, command) if os.access(path, os.X_OK) and os.path.isfile(path): yield path
def hook_mem_access(self, emu, access, address, size, value, user_data): if access == unicorn.UC_MEM_WRITE: self.pprint("Write: *%#x = %#x (size = %u)"% (address, value, size), "Memory") elif access == unicorn.UC_MEM_READ: self.pprint("Read: *%#x (size = %u)" % (address, size), "Memory") return
def getMappingsFromTable(self): self._maps = [] sz = self.memory_mapping.rowCount() for i in range(sz): name = self.memory_mapping.item(i, 0) if not name: continue name = name.text() address = self.memory_mapping.item(i, 1) if address: if ishex(address.text()): address = int(address.text(), 0x10) else: address = int(address.text()) size = self.memory_mapping.item(i, 2) if size: size = int(size.text(), 0x10) if ishex(size.text()) else int(size.text()) permission = self.memory_mapping.item(i, 3) if permission: permission = permission.text() read_from_file = self.memory_mapping.item(i, 4) if read_from_file and not os.access(read_from_file.text(), os.R_OK): read_from_file = None self._maps.append([name, address, size, permission, read_from_file]) return
def loadCode(self, title, filter, run_disassembler): qFile, qFilter = QFileDialog.getOpenFileName(self, title, EXAMPLES_PATH, filter) if not os.access(qFile, os.R_OK): return if run_disassembler or qFile.endswith(".raw"): body = disassemble_file(qFile, self.arch) self.loadFile(qFile, data=body) else: self.loadFile(qFile) return
def can_exe(self, fpath): """ Can test if path exists and is executable """ if isinstance(fpath, basestring): return isfile(fpath) and access(fpath, X_OK) return False
def export(self): ''' Export visible layers and layer groups to CoaSprite ''' if os.path.isfile(os.path.join(self.path, self.name)): show_error_msg('ABORTING!\nDestination is not a folder.\n {path}/{name}'.format(path=self.path, name=self.name)) return if not os.access(self.path, os.W_OK): show_error_msg('ABORTING!\nDestination is not a writable.\n {path}'.format(path=self.path)) return if os.path.isdir(os.path.join(self.path, self.name)): show_error_msg('Destination exists, I may have overwritten something in {path}/{name}'.format(path=self.path, name=self.name)) self.mkdir() # Loop through visible layers self.img = self.original_img.duplicate() self.img.undo_group_start() for layer in self.img.layers: if layer.visible: name = '{name}.png'.format(name=layer.name) pdb.gimp_image_set_active_layer(self.img, layer) # Crop and the layer position pdb.plug_in_autocrop_layer(self.img, layer) z = 0 - pdb.gimp_image_get_item_position(self.img, layer) if isinstance(layer, gimp.GroupLayer): if len(layer.children) > 0: self.sprites.append(self.export_sprite_sheet(layer, name, layer.offsets, z)) else: self.sprites.append(self.export_sprite(layer, name, layer.offsets, z)) self.write_json() self.img.undo_group_end() pdb.gimp_image_delete(self.img)
def is_writable(self, path): result = False while not result: if os.path.exists(path): result = os.access(path, os.W_OK) break parent = os.path.dirname(path) if parent == path: break path = parent return result
def get_cache_base(suffix=None): """ Return the default base location for distlib caches. If the directory does not exist, it is created. Use the suffix provided for the base directory, and default to '.distlib' if it isn't provided. On Windows, if LOCALAPPDATA is defined in the environment, then it is assumed to be a directory, and will be the parent directory of the result. On POSIX, and on Windows if LOCALAPPDATA is not defined, the user's home directory - using os.expanduser('~') - will be the parent directory of the result. The result is just the directory '.distlib' in the parent directory as determined above, or with the name specified with ``suffix``. """ if suffix is None: suffix = '.distlib' if os.name == 'nt' and 'LOCALAPPDATA' in os.environ: result = os.path.expandvars('$localappdata') else: # Assume posix, or old Windows result = os.path.expanduser('~') # we use 'isdir' instead of 'exists', because we want to # fail if there's a file with that name if os.path.isdir(result): usable = os.access(result, os.W_OK) if not usable: logger.warning('Directory exists but is not writable: %s', result) else: try: os.makedirs(result) usable = True except OSError: logger.warning('Unable to create %s', result, exc_info=True) usable = False if not usable: result = tempfile.mkdtemp() logger.warning('Default location unusable, using %s', result) return os.path.join(result, suffix)
def extraction_error(self): """Give an error message for problems extracting file(s)""" old_exc = sys.exc_info()[1] cache_path = self.extraction_path or get_default_cache() tmpl = textwrap.dedent(""" Can't extract file(s) to egg cache The following error occurred while trying to extract file(s) to the Python egg cache: {old_exc} The Python egg cache directory is currently set to: {cache_path} Perhaps your account does not have write access to this directory? You can change the cache directory by setting the PYTHON_EGG_CACHE environment variable to point to an accessible directory. """).lstrip() err = ExtractionError(tmpl.format(**locals())) err.manager = self err.cache_path = cache_path err.original_error = old_exc raise err
def _mkstemp_inner(dir, pre, suf, flags, output_type): """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" names = _get_candidate_names() if output_type is bytes: names = map(_os.fsencode, names) for seq in range(TMP_MAX): name = next(names) file = _os.path.join(dir, pre + name + suf) try: fd = _os.open(file, flags, 0o600) except FileExistsError: continue # try again except PermissionError: # This exception is thrown when a directory with the chosen name # already exists on windows. if (_os.name == 'nt' and _os.path.isdir(dir) and _os.access(dir, _os.W_OK)): continue else: raise return (fd, _os.path.abspath(file)) raise FileExistsError(_errno.EEXIST, "No usable temporary file name found") # User visible interfaces.
def mkdtemp(suffix=None, prefix=None, dir=None): """User-callable function to create and return a unique temporary directory. The return value is the pathname of the directory. Arguments are as for mkstemp, except that the 'text' argument is not accepted. The directory is readable, writable, and searchable only by the creating user. Caller is responsible for deleting the directory when done with it. """ prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) names = _get_candidate_names() if output_type is bytes: names = map(_os.fsencode, names) for seq in range(TMP_MAX): name = next(names) file = _os.path.join(dir, prefix + name + suffix) try: _os.mkdir(file, 0o700) except FileExistsError: continue # try again except PermissionError: # This exception is thrown when a directory with the chosen name # already exists on windows. if (_os.name == 'nt' and _os.path.isdir(dir) and _os.access(dir, _os.W_OK)): continue else: raise return file raise FileExistsError(_errno.EEXIST, "No usable temporary directory name found")