我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.R_OK。
def copy_from_host(module): compress = module.params.get('compress') src = module.params.get('src') if not os.path.exists(src): module.fail_json(msg="file not found: {}".format(src)) if not os.access(src, os.R_OK): module.fail_json(msg="file is not readable: {}".format(src)) mode = oct(os.stat(src).st_mode & 0o777) with open(src, 'rb') as f: raw_data = f.read() sha1 = hashlib.sha1(raw_data).hexdigest() data = zlib.compress(raw_data) if compress else raw_data module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode, source=src)
def populate_memory(self, areas): for name, address, size, permission, input_file in areas: perm = self.unicorn_permissions(permission) self.vm.mem_map(address, size, perm) self.areas[name] = [address, size, permission,] msg = "Map %s @%x (size=%d,perm=%s)" % (name, address, size, permission) if input_file is not None and os.access(input_file, os.R_OK): code = open(input_file, 'rb').read() self.vm.mem_write(address, bytes(code[:size])) msg += " and content from '%s'" % input_file self.log(msg, "Setup") self.start_addr = self.areas[".text"][0] self.end_addr = -1 return True
def _test_NoAccessDir(self, nodeName): devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName) device = devMgr._get_registeredDevices()[0] fileMgr = self._domMgr._get_fileMgr() dirname = '/noaccess' testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname) if not os.path.exists(testdir): os.mkdir(testdir, 0000) else: os.chmod(testdir, 0000) try: self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory') self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY) finally: os.rmdir(testdir)
def test_ExistsException(self): self.assertNotEqual(self._domMgr, None) fileMgr = self._domMgr._get_fileMgr() # Makes sure that FileSystem::exists() throws correct exception and # doesn't kill domain for files in directories it cannot access dirname = '/noaccess' testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname) if not os.path.exists(testdir): os.mkdir(testdir, 0644) else: os.chmod(testdir, 0644) try: self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory') self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile')) finally: os.rmdir(testdir)
def check_fastq(fastq): # Check if fastq is readable if not os.access(fastq, os.R_OK): martian.exit("Do not have file read permission for FASTQ file: %s" % fastq) # Check if fastq is gzipped is_gzip_fastq = True try: with gzip.open(fastq) as f: f.read(1) except: is_gzip_fastq = False if is_gzip_fastq and not fastq.endswith(cr_constants.GZIP_SUFFIX): martian.exit("Input FASTQ file is gzipped but filename does not have %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX)) if not is_gzip_fastq and fastq.endswith(cr_constants.GZIP_SUFFIX): martian.exit("Input FASTQ file is not gzipped but filename has %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
def find_boot_files(name, shortname, basedir): # find vmlinuz or initrd if name: fullpath = name if name[0]=='/' else basedir + '/boot/' + name else: # try the (only) symlink at the root directory try1 = basedir + '/' + shortname + '*' found = sorted(glob.glob(try1)) if len(found) >= 1 and os.access(found[0], os.R_OK): fullpath = os.path.realpath(found[0]) else: # try the highest numbered version at /boot try2 = basedir + '/boot/' + shortname + '*' found = sorted(glob.glob(try2)) if len(found) < 1: sys.exit('cannot read ' + try1 + ' and cannot find ' + try2) fullpath = found[-1] if (len(found) > 1): warnings.warn('found more than one ' + try2 + ' , using ' + fullpath) if not os.access(fullpath, os.R_OK): sys.exit('failed to read ' + fullpath) return fullpath
def check_access(filename, write_required=True): """ Checks if user has read and optionaly write access to specified file. Uses acl first and possix file permisions if acl cannot be used. Returns true only if user has both required access rights. """ if HAVE_POSIX1E: for pset in posix1e.ACL(file=filename): if pset.tag_type == posix1e.ACL_USER and pset.qualifier == os.geteuid(): if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)): return True if pset.tag_type == posix1e.ACL_GROUP and pset.qualifier in os.getgroups(): if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)): return True if write_required: return os.access(filename, os.R_OK | os.W_OK) return os.access(filename, os.R_OK)
def checkUSBStick(self): self.target_dir = None allpartitions = [ (r.description, r.mountpoint) for r in harddiskmanager.getMountedPartitions(onlyhotplug = True)] print "[checkUSBStick] found partitions:", allpartitions usbpartition = [] for x in allpartitions: print x, x[1] == '/', x[0].find("USB"), access(x[1], R_OK) if x[1] != '/' and x[0].find("USB") > -1: # and access(x[1], R_OK) is True: usbpartition.append(x) print usbpartition if len(usbpartition) == 1: self.target_dir = usbpartition[0][1] self.md5_passback = self.getFeed self.md5_failback = self.askStartWizard self.md5verify(self.stickimage_md5, self.target_dir) elif not usbpartition: print "[NFIFlash] needs to create usb flasher stick first!" self.askStartWizard() else: self.askStartWizard()
def __new__(cls, filename): files = [] timestamp = 0 for path in process.get_config_directories(): config_file = os.path.realpath(os.path.join(path, filename)) if config_file not in files and os.access(config_file, os.R_OK): try: timestamp = max(timestamp, os.stat(config_file).st_mtime) except (OSError, IOError): continue files.append(config_file) instance = cls.instances.get(filename, None) if instance is None or instance.files != files or instance.timestamp < timestamp: instance = object.__new__(cls) instance.parser = SafeConfigParser() instance.parser.optionxform = lambda x: x.replace('-', '_') instance.files = instance.parser.read(files) instance.filename = filename instance.timestamp = timestamp cls.instances[filename] = instance return instance
def effectivelyReadable(self): uid = os.getuid() euid = os.geteuid() gid = os.getgid() egid = os.getegid() # This is probably true most of the time, so just let os.access() # handle it. Avoids potential bugs in the rest of this function. if uid == euid and gid == egid: return os.access(self.name, os.R_OK) st = os.stat(self.name) # This may be wrong depending on the semantics of your OS. # i.e. if the file is -------r--, does the owner have access or not? if st.st_uid == euid: return st.st_mode & stat.S_IRUSR != 0 # See comment for UID check above. groups = os.getgroups() if st.st_gid == egid or st.st_gid in groups: return st.st_mode & stat.S_IRGRP != 0 return st.st_mode & stat.S_IROTH != 0
def cfndiff_module_validation(module): ''' Validate for correct module call/usage in ansible. ''' # Boto3 is required! if not HAS_BOTO3: module.fail_json(msg='boto3 is required. Try pip install boto3') # cfn_flip is required! if not HAS_CFN_FLIP: module.fail_json(msg='cfn_flip is required. Try pip install cfn_flip') template = module.params['template'] b_template = to_bytes(template, errors='surrogate_or_strict') # Validate path of template if not os.path.exists(b_template): module.fail_json(msg="template %s not found" % (template)) if not os.access(b_template, os.R_OK): module.fail_json(msg="template %s not readable" % (template)) if os.path.isdir(b_template): module.fail_json(msg="diff does not support recursive diff of directory: %s" % (template)) return module
def get_pip_requirements(fname=os.path.join(dingdangpath.LIB_PATH, 'requirements.txt')): """ Gets the PIP requirements from a text file. If the files does not exists or is not readable, it returns None Arguments: fname -- (optional) the requirement text file (Default: "client/requirements.txt") Returns: A list of pip requirement objects or None """ logger = logging.getLogger(__name__) if os.access(fname, os.R_OK): reqs = list(pip.req.parse_requirements(fname)) logger.debug("Found %d PIP requirements in file '%s'", len(reqs), fname) return reqs else: logger.debug("PIP requirements file '%s' not found or not readable", fname)
def _setup_image(self, image_path): """ Load the image located at the specified path @type image_path: str @param image_path: the relative or absolute file path to the image file @rtype: sensor_msgs/Image or None @param: Returns sensor_msgs/Image if image convertable and None otherwise """ if not os.access(image_path, os.R_OK): rospy.logerr("Cannot read file at '{0}'".format(image_path)) return None img = cv2.imread(image_path) # Return msg return cv_bridge.CvBridge().cv2_to_imgmsg(img, encoding="bgr8")
def copydir(self, path): """ Copy the contents of the local directory given by path to google cloud. Maintain the same directory structure on remote. This is (intentionally) a blocking call, so clients can report errors if the transfer fails. :type path: string :param path: relative or absolute path to the directory that needs to be copied :return: True when transfer is complete :raises OSError: path doesn't exist or permission denied :raises ValueError: if the library cannot determine the file size :raises gcloud.exceptions.GCloudError: if upload status gives error response """ if not os.access(path, os.R_OK): raise OSError('Permission denied') for filename in find_files(path): blob = Blob(filename, self) blob.upload_from_filename(filename) return True
def register(self,name,URI): origname,name=name,self.validateName(name) URI=self.validateURI(URI) fn=self.translate(name) self.lock.acquire() try: if os.access(fn,os.R_OK): Log.msg('NameServer','name already exists:',name) raise Pyro.errors.NamingError('name already exists',name) try: open(fn,'w').write(str(URI)+'\n') self._dosynccall("register",origname,URI) Log.msg('NameServer','registered',name,'with URI',str(URI)) except IOError,x: if x.errno==errno.ENOENT: raise Pyro.errors.NamingError('(parent)group not found') elif x.errno==errno.ENOTDIR: raise Pyro.errors.NamingError('parent is no group') else: raise Pyro.errors.NamingError(str(x)) finally: self.lock.release()
def deleteGroup(self,groupname): groupname=self.validateName(groupname) if groupname==':': Log.msg('NameServer','attempt to deleteGroup root group') raise Pyro.errors.NamingError('not allowed to delete root group') dirnam = self.translate(groupname) self.lock.acquire() try: if not os.access(dirnam,os.R_OK): raise Pyro.errors.NamingError('group not found',groupname) try: shutil.rmtree(dirnam) self._dosynccall("deleteGroup",groupname) Log.msg('NameServer','deleted group',groupname) except OSError,x: if x.errno==errno.ENOENT: raise Pyro.errors.NamingError('group not found',groupname) elif x.errno==errno.ENOTDIR: raise Pyro.errors.NamingError('is no group',groupname) else: raise Pyro.errors.NamingError(str(x)) finally: self.lock.release()
def set_imageinfo( memoryFilePath ): path = r'{}'.format( memoryFilePath ) path = os.path.abspath( path ) try: if os.access( path, os.F_OK): if os.access( path, os.R_OK ): cwd = os.getcwd() imageinfo = [ 'vol.py', '-f', '{}'.format( path ), 'imageinfo', \ '--output=text', \ '--output-file={}'.format( os.path.join( cwd, 'imageinfo.text' ))] return imageinfo else: print '\n[!] Error File Permissions: No Read Access for {}\n'.format( path ) else: print '\n[!] Error FilePath: Does not exist {}\n'.format( path ) except Exception as set_imageinfo_error: print '[!] EXCEPTION ERROR: < set_imageinfo > function' print set_imageinfo_error
def check_local_file_valid(self, local_path): """????????(??????) :param local_path: :return: """ if not os.path.exists(local_path): self._err_tips = 'local_file %s not exist!' % local_path return False if not os.path.isfile(local_path): self._err_tips = 'local_file %s is not regular file!' % local_path return False if not os.access(local_path, os.R_OK): self._err_tips = 'local_file %s is not readable!' % local_path return False return True
def check_file_freshness(filename, newer_than=3600): """ Check a file exists, is readable and is newer than <n> seconds (where <n> defaults to 3600). """ # First check the file exists and is readable if not os.path.exists(filename): raise CriticalError("%s: does not exist." % (filename)) if os.access(filename, os.R_OK) == 0: raise CriticalError("%s: is not readable." % (filename)) # Then ensure the file is up-to-date enough mtime = os.stat(filename).st_mtime last_modified = time.time() - mtime if last_modified > newer_than: raise CriticalError("%s: was last modified on %s and is too old " "(> %s seconds)." % (filename, time.ctime(mtime), newer_than)) if last_modified < 0: raise CriticalError("%s: was last modified on %s which is in the " "future." % (filename, time.ctime(mtime)))
def validate_args(self, args: configargparse.Namespace) -> None: _docker_args.validate_shared_args(args) if not args.docker_buildfile: raise ErrorMessage("Using the docker builder requires you to specify a Dockerfile template via " "--docker-buildfile.") if not os.path.exists(args.docker_buildfile) or not os.access(args.docker_buildfile, os.R_OK): raise ErrorMessage("It seems that GoPythonGo can't find or isn't allowed to read %s" % highlight(args.docker_buildfile)) for arg in args.docker_buildargs: if "=" not in arg: raise ErrorMessage("A Docker build arg must be in the form 'key=value'. Consult the %s " "documentation for more information. '%s' does not contain a '='." % (highlight("docker build"), arg)) for var in args.dockerfile_vars: if "=" not in var: raise ErrorMessage("A Dockerfile Jinja template context variable must be in the form 'key=value'. " "'%s' does not contain a '='" % var)
def user_password(self): passwd = '' if HAVE_SPWD: try: passwd = spwd.getspnam(self.name)[1] except KeyError: return passwd if not self.user_exists(): return passwd elif self.SHADOWFILE: # Read shadow file for user's encrypted password string if os.path.exists(self.SHADOWFILE) and os.access(self.SHADOWFILE, os.R_OK): for line in open(self.SHADOWFILE).readlines(): if line.startswith('%s:' % self.name): passwd = line.split(':')[1] return passwd
def get_pid_location(module): """ Try to find a pid directory in the common locations, falling back to the user's home directory if no others exist """ for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]: try: if os.path.isdir(dir) and os.access(dir, os.R_OK|os.W_OK): return os.path.join(dir, '.accelerate.pid') except: pass module.fail_json(msg="couldn't find any valid directory to use for the accelerate pid file") # NOTE: this shares a fair amount of code in common with async_wrapper, if async_wrapper were a new module we could move # this into utils.module_common and probably should anyway
def _configure_base(module, base, conf_file, disable_gpg_check): """Configure the dnf Base object.""" conf = base.conf # Turn off debug messages in the output conf.debuglevel = 0 # Set whether to check gpg signatures conf.gpgcheck = not disable_gpg_check # Don't prompt for user confirmations conf.assumeyes = True # Change the configuration file path if provided if conf_file: # Fail if we can't read the configuration file. if not os.access(conf_file, os.R_OK): module.fail_json( msg="cannot read configuration file", conf_file=conf_file) else: conf.config_file_path = conf_file # Read the configuration file conf.read()
def get_file_content(path, default=None, strip=True): data = default if os.path.exists(path) and os.access(path, os.R_OK): try: try: datafile = open(path) data = datafile.read() if strip: data = data.strip() if len(data) == 0: data = default finally: datafile.close() except: # ignore errors as some jails/containers might have readable permissions but not allow reads to proc # done in 2 blocks for 2.4 compat pass return data
def getMappingsFromTable(self): self._maps = [] sz = self.memory_mapping.rowCount() for i in range(sz): name = self.memory_mapping.item(i, 0) if not name: continue name = name.text() address = self.memory_mapping.item(i, 1) if address: if ishex(address.text()): address = int(address.text(), 0x10) else: address = int(address.text()) size = self.memory_mapping.item(i, 2) if size: size = int(size.text(), 0x10) if ishex(size.text()) else int(size.text()) permission = self.memory_mapping.item(i, 3) if permission: permission = permission.text() read_from_file = self.memory_mapping.item(i, 4) if read_from_file and not os.access(read_from_file.text(), os.R_OK): read_from_file = None self._maps.append([name, address, size, permission, read_from_file]) return
def loadCode(self, title, filter, run_disassembler): qFile, qFilter = QFileDialog.getOpenFileName(self, title, EXAMPLES_PATH, filter) if not os.access(qFile, os.R_OK): return if run_disassembler or qFile.endswith(".raw"): body = disassemble_file(qFile, self.arch) self.loadFile(qFile, data=body) else: self.loadFile(qFile) return
def test_get_default_keystore_path(self): """ Checks we the default keystore directory exists or create it. Verify the path is correct and that we have read/write access to it. """ keystore_dir = PyWalib.get_default_keystore_path() if not os.path.exists(keystore_dir): os.makedirs(keystore_dir) # checks path correctness self.assertTrue(keystore_dir.endswith(".config/pyethapp/keystore/")) # checks read/write access self.assertEqual(os.access(keystore_dir, os.R_OK), True) self.assertEqual(os.access(keystore_dir, os.W_OK), True)
def read_file(filename): filename_path = os.path.join('/etc/ceph', filename) if not os.path.exists(filename_path): json_exit("file not found: {}".format(filename_path), failed=True) if not os.access(filename_path, os.R_OK): json_exit("file not readable: {}".format(filename_path), failed=True) with open(filename_path, 'rb') as f: raw_data = f.read() return {'content': base64.b64encode(zlib.compress(raw_data)), 'sha1': hashlib.sha1(raw_data).hexdigest(), 'filename': filename}
def copy_to_host(module): compress = module.params.get('compress') dest = module.params.get('dest') mode = int(module.params.get('mode'), 0) sha1 = module.params.get('sha1') src = module.params.get('src') data = base64.b64decode(src) raw_data = zlib.decompress(data) if compress else data if sha1: if os.path.exists(dest): if os.access(dest, os.R_OK): with open(dest, 'rb') as f: if hashlib.sha1(f.read()).hexdigest() == sha1: module.exit_json(changed=False) else: module.exit_json(failed=True, changed=False, msg='file is not accessible: {}'.format(dest)) if sha1 != hashlib.sha1(raw_data).hexdigest(): module.exit_json(failed=True, changed=False, msg='sha1 sum does not match data') with os.fdopen(os.open(dest, os.O_WRONLY | os.O_CREAT, mode), 'wb') as f: f.write(raw_data) module.exit_json(changed=True)
def test_NoWriteCache(self): cachedir = os.getcwd()+'/sdr/cache/.BasicTestDevice_node' (status,output) = commands.getstatusoutput('mkdir -p '+cachedir) (status,output) = commands.getstatusoutput('chmod 000 '+cachedir) self.assertFalse(os.access(cachedir, os.R_OK|os.W_OK|os.X_OK), 'Current user can still access directory') devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml") self.assertEquals(255, devmgr_nb.returncode) self.assertEquals(devMgr, None)
def main(args, outs): if not (args.filtered_matrices_h5 and os.path.exists(args.filtered_matrices_h5)): martian.exit("Filtered matrices do not exist: %s" % args.filtered_matrices_h5) if not os.access(args.filtered_matrices_h5, os.R_OK): martian.exit("Filtered matrices file is not readable, please check file permissions: %s" % args.filtered_matrices_h5) h5_filetype = cr_utils.get_h5_filetype(args.filtered_matrices_h5) if h5_filetype and h5_filetype != cr_matrix.MATRIX_H5_FILETYPE: martian.exit("Input is a %s file, but a matrix file is required" % h5_filetype) flt_genomes = cr_matrix.GeneBCMatrices.load_genomes_from_h5(args.filtered_matrices_h5) if len(flt_genomes) != 1: martian.exit("Reanalyzer only supports matrices with one genome. This matrix has: %s" % flt_genomes)
def validate_csv(csv_file, entry_type, entry_colname): if not os.path.exists(csv_file): martian.exit("Specified %s file does not exist: %s" % (entry_type, csv_file)) elif not os.access(csv_file, os.R_OK): martian.exit("Specified %s file is not readable, please check file permissions: %s" % (entry_type, csv_file)) with open(csv_file) as f: header = f.readline().strip().split(',') if header[0] != entry_colname: martian.exit("First line of %s file must be a header line, with '%s' as the first column." % (entry_type, entry_colname)) counts = sum(1 for line in f) # count remaining lines if counts == 0: martian.exit("Specified %s file must contain at least one entry." % entry_type) return counts
def parse_parameters(filename): if filename is None: return {} if not os.path.exists(filename): martian.exit("Parameters file does not exist: %s" % filename) if not os.access(filename, os.R_OK): martian.exit("Parameters file is not readable, please check file permissions: %s" % filename) params = {} with open(filename, 'rU') as f: # skip comment lines ff = filter(lambda row: not row.startswith('#') , f) reader = csv.reader(ff) for (i, row) in enumerate(reader, start=1): if len(row) != 2: martian.exit("Row %d is incorrectly formatted (must have exactly 2 columns)" % i) name = row[0].strip().lower() value = row[1].strip() if name not in ANALYSIS_PARAMS: martian.exit("Unrecognized parameter: %s" % name) if name in params: martian.exit("Cannot specify the same parameter twice: %s" % name) required_type = ANALYSIS_PARAMS[name] try: cast_value = required_type(value) params[name] = cast_value except ValueError: martian.exit("Parameter %s could not be cast to the required type: %s" % (name, required_type)) return params
def check_runinfo_xml(folder_path): """ :return: path to valid RunInfo.xml in folder_path :rtype: string """ hostname = socket.gethostname() check_folder("sequencing run", folder_path, hostname) runinfo = os.path.join(folder_path, "RunInfo.xml") if not os.path.exists(runinfo): martian.exit("On machine: %s, RunInfo.xml not found. Cannot verify run was 10X-prepped." % hostname) if not os.access(runinfo, os.R_OK): martian.exit("On machine: %s, insufficient permission to open RunInfo.xml." % hostname) return runinfo
def __init__(self, cloudpath, mip=0, bounded=True, fill_missing=False, cache=False, cdn_cache=True, progress=INTERACTIVE, info=None, provenance=None): self.path = lib.extract_path(cloudpath) self.progress = progress self.mip = mip self.bounded = bounded self.fill_missing = fill_missing self.cache = cache self.cdn_cache = cdn_cache if self.cache: if not os.path.exists(self.cache_path): mkdir(self.cache_path) if not os.access(self.cache_path, os.R_OK|os.W_OK): raise IOError('Cache directory needs read/write permission: ' + self.cache_path) if info is None: self.refresh_info() if self.cache: self._check_cached_info_validity() else: self.info = info if provenance is None: self.provenance = None self.refresh_provenance() self._check_cached_provenance_validity() else: self.provenance = self._cast_provenance(provenance) try: self.mip = self.available_mips[self.mip] except: raise Exception("MIP {} has not been generated.".format(self.mip))
def check_readable(f): """Checks if path exists and readable""" if not os.path.exists(f) or not os.access(f, os.R_OK): raise PyseederException("Error accessing path: {}".format(f))
def test_generateScriptEnv(self): filename = "/tmp/test.sh" testFile = "/tmp/test.txt" cmds = """ X="${ENV_TEST}" touch "${X}" exit 0 """ generateScript(filename, cmds, {"ENV_TEST": testFile}) # check permissions self.assertTrue(os.access(filename, os.F_OK)) self.assertTrue(os.access(filename, os.X_OK | os.R_OK)) # execute the script and expect it to create the file status = subprocess.call([filename]) self.assertEqual(status, 0) # check that the testFile got created by the script self.assertTrue(os.access(testFile, os.F_OK)) # cleanup os.remove(filename) os.remove(testFile)
def _select_config_file_path(): """ Return an openATTIC configuration pathname """ possible_paths = ("/etc/default/openattic", "/etc/sysconfig/openattic") for path in possible_paths: if os.access(path, os.F_OK) and os.access(path, os.R_OK | os.W_OK): return path raise CommandExecutionError( ("No openATTIC config file found in the following locations: " "{}".format(possible_paths)))
def find_on_path(importer, path_item, only=False): """Yield distributions accessible on a sys.path directory""" path_item = _normalize_cached(path_item) if os.path.isdir(path_item) and os.access(path_item, os.R_OK): if path_item.lower().endswith('.egg'): # unpacked egg yield Distribution.from_filename( path_item, metadata=PathMetadata( path_item, os.path.join(path_item,'EGG-INFO') ) ) else: # scan for .egg and .egg-info in directory for entry in os.listdir(path_item): lower = entry.lower() if lower.endswith('.egg-info') or lower.endswith('.dist-info'): fullpath = os.path.join(path_item, entry) if os.path.isdir(fullpath): # egg-info directory, allow getting metadata metadata = PathMetadata(path_item, fullpath) else: metadata = FileMetadata(fullpath) yield Distribution.from_location( path_item,entry,metadata,precedence=DEVELOP_DIST ) elif not only and lower.endswith('.egg'): for dist in find_distributions(os.path.join(path_item, entry)): yield dist elif not only and lower.endswith('.egg-link'): entry_file = open(os.path.join(path_item, entry)) try: entry_lines = entry_file.readlines() finally: entry_file.close() for line in entry_lines: if not line.strip(): continue for item in find_distributions(os.path.join(path_item,line.rstrip())): yield item break
def _setup(self): """ sets up the SMIME.SMIME instance and loads the CA certificates store """ smime = SMIME.SMIME() st = X509.X509_Store() if not os.access(self._certstore, os.R_OK): raise VerifierError, "cannot access %s" % self._certstore st.load_info(self._certstore) smime.set_x509_store(st) self._smime = smime
def __init__(self, filename, flag='c', mode=None, format='pickle', *args, **kwds): self.flag = flag # r=readonly, c=create, or n=new self.mode = mode # None or an octal triple like 0644 self.format = format # 'csv', 'json', or 'pickle' self.filename = filename if flag != 'n' and os.access(filename, os.R_OK): fileobj = open(filename, 'rb' if format=='pickle' else 'r') with fileobj: self.load(fileobj) dict.__init__(self, *args, **kwds)
def find_on_path(importer, path_item, only=False): """Yield distributions accessible on a sys.path directory""" path_item = _normalize_cached(path_item) if os.path.isdir(path_item) and os.access(path_item, os.R_OK): if _is_unpacked_egg(path_item): yield Distribution.from_filename( path_item, metadata=PathMetadata( path_item, os.path.join(path_item,'EGG-INFO') ) ) else: # scan for .egg and .egg-info in directory for entry in os.listdir(path_item): lower = entry.lower() if lower.endswith('.egg-info') or lower.endswith('.dist-info'): fullpath = os.path.join(path_item, entry) if os.path.isdir(fullpath): # egg-info directory, allow getting metadata metadata = PathMetadata(path_item, fullpath) else: metadata = FileMetadata(fullpath) yield Distribution.from_location( path_item, entry, metadata, precedence=DEVELOP_DIST ) elif not only and _is_unpacked_egg(entry): dists = find_distributions(os.path.join(path_item, entry)) for dist in dists: yield dist elif not only and lower.endswith('.egg-link'): with open(os.path.join(path_item, entry)) as entry_file: entry_lines = entry_file.readlines() for line in entry_lines: if not line.strip(): continue path = os.path.join(path_item, line.rstrip()) dists = find_distributions(path) for item in dists: yield item break
def convert(self, value, param, ctx): rv = value is_dash = self.file_okay and self.allow_dash and rv in (b'-', '-') if not is_dash: if self.resolve_path: rv = os.path.realpath(rv) try: st = os.stat(rv) except OSError: if not self.exists: return self.coerce_path_result(rv) self.fail('%s "%s" does not exist.' % ( self.path_type, filename_to_ui(value) ), param, ctx) if not self.file_okay and stat.S_ISREG(st.st_mode): self.fail('%s "%s" is a file.' % ( self.path_type, filename_to_ui(value) ), param, ctx) if not self.dir_okay and stat.S_ISDIR(st.st_mode): self.fail('%s "%s" is a directory.' % ( self.path_type, filename_to_ui(value) ), param, ctx) if self.writable and not os.access(value, os.W_OK): self.fail('%s "%s" is not writable.' % ( self.path_type, filename_to_ui(value) ), param, ctx) if self.readable and not os.access(value, os.R_OK): self.fail('%s "%s" is not readable.' % ( self.path_type, filename_to_ui(value) ), param, ctx) return self.coerce_path_result(rv)
def find_on_path(importer, path_item, only=False): """Yield distributions accessible on a sys.path directory""" path_item = _normalize_cached(path_item) if os.path.isdir(path_item) and os.access(path_item, os.R_OK): if path_item.lower().endswith('.egg'): # unpacked egg yield Distribution.from_filename( path_item, metadata=PathMetadata( path_item, os.path.join(path_item,'EGG-INFO') ) ) else: # scan for .egg and .egg-info in directory for entry in os.listdir(path_item): lower = entry.lower() if lower.endswith('.egg-info') or lower.endswith('.dist-info'): fullpath = os.path.join(path_item, entry) if os.path.isdir(fullpath): # egg-info directory, allow getting metadata metadata = PathMetadata(path_item, fullpath) else: metadata = FileMetadata(fullpath) yield Distribution.from_location( path_item, entry, metadata, precedence=DEVELOP_DIST ) elif not only and lower.endswith('.egg'): dists = find_distributions(os.path.join(path_item, entry)) for dist in dists: yield dist elif not only and lower.endswith('.egg-link'): with open(os.path.join(path_item, entry)) as entry_file: entry_lines = entry_file.readlines() for line in entry_lines: if not line.strip(): continue path = os.path.join(path_item, line.rstrip()) dists = find_distributions(path) for item in dists: yield item break